[BEAM-8661] Moving other modules to have per-module loggers

diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 5142a34..8a94acf 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -40,6 +40,10 @@
 # information.
 executing_project = None
 
+
+_LOGGER = logging.getLogger(__name__)
+
+
 if GceAssertionCredentials is not None:
   class _GceAssertionCredentials(GceAssertionCredentials):
     """GceAssertionCredentials with retry wrapper.
@@ -101,9 +105,9 @@
       # apitools use urllib with the global timeout. Set it to 60 seconds
       # to prevent network related stuckness issues.
       if not socket.getdefaulttimeout():
-        logging.info("Setting socket default timeout to 60 seconds.")
+        _LOGGER.info("Setting socket default timeout to 60 seconds.")
         socket.setdefaulttimeout(60)
-      logging.info(
+      _LOGGER.info(
           "socket default timeout is %s seconds.", socket.getdefaulttimeout())
 
       cls._credentials = cls._get_service_credentials()
@@ -131,7 +135,7 @@
                       'Credentials.')
         return credentials
       except Exception as e:
-        logging.warning(
+        _LOGGER.warning(
             'Unable to find default credentials to use: %s\n'
             'Connecting anonymously.', e)
         return None
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index b92ee44..4de4b51 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -48,6 +48,9 @@
     ]
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def _static_value_provider_of(value_type):
   """"Helper function to plug a ValueProvider into argparse.
 
@@ -262,7 +265,7 @@
       add_extra_args_fn(parser)
     known_args, unknown_args = parser.parse_known_args(self._flags)
     if unknown_args:
-      logging.warning("Discarding unparseable args: %s", unknown_args)
+      _LOGGER.warning("Discarding unparseable args: %s", unknown_args)
     result = vars(known_args)
 
     # Apply the overrides if any
@@ -510,7 +513,7 @@
     """
     environment_region = os.environ.get('CLOUDSDK_COMPUTE_REGION')
     if environment_region:
-      logging.info('Using default GCP region %s from $CLOUDSDK_COMPUTE_REGION',
+      _LOGGER.info('Using default GCP region %s from $CLOUDSDK_COMPUTE_REGION',
                    environment_region)
       return environment_region
     try:
@@ -523,12 +526,12 @@
       raw_output = processes.check_output(cmd, stderr=DEVNULL)
       formatted_output = raw_output.decode('utf-8').strip()
       if formatted_output:
-        logging.info('Using default GCP region %s from `%s`',
+        _LOGGER.info('Using default GCP region %s from `%s`',
                      formatted_output, ' '.join(cmd))
         return formatted_output
     except RuntimeError:
       pass
-    logging.warning(
+    _LOGGER.warning(
         '--region not set; will default to us-central1. Future releases of '
         'Beam will require the user to set --region explicitly, or else have a '
         'default set via the gcloud tool. '
diff --git a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
index 688d602..916faa4 100644
--- a/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
+++ b/sdks/python/apache_beam/testing/benchmarks/nexmark/nexmark_util.py
@@ -41,6 +41,8 @@
 import apache_beam as beam
 from apache_beam.testing.benchmarks.nexmark.models import nexmark_model
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class Command(object):
   def __init__(self, cmd, args):
@@ -53,7 +55,7 @@
                     timeout, self.cmd.__name__)
 
       self.cmd(*self.args)
-      logging.info('%d seconds elapsed. Thread (%s) finished.',
+      _LOGGER.info('%d seconds elapsed. Thread (%s) finished.',
                    timeout, self.cmd.__name__)
 
     thread = threading.Thread(target=thread_target, name='Thread-timeout')
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index c5c5259..2f22b02 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -72,6 +72,8 @@
     }
 ]
 
+_LOGGER = logging.getLogger(__name__)
+
 
 def parse_step(step_name):
   """Replaces white spaces and removes 'Step:' label
@@ -330,13 +332,13 @@
     if len(results) > 0:
       log = "Load test results for test: %s and timestamp: %s:" \
             % (results[0][ID_LABEL], results[0][SUBMIT_TIMESTAMP_LABEL])
-      logging.info(log)
+      _LOGGER.info(log)
       for result in results:
         log = "Metric: %s Value: %d" \
               % (result[METRICS_TYPE_LABEL], result[VALUE_LABEL])
-        logging.info(log)
+        _LOGGER.info(log)
     else:
-      logging.info("No test results were collected.")
+      _LOGGER.info("No test results were collected.")
 
 
 class BigQueryMetricsPublisher(object):
@@ -351,7 +353,7 @@
       for output in outputs:
         errors = output['errors']
         for err in errors:
-          logging.error(err['message'])
+          _LOGGER.error(err['message'])
           raise ValueError(
               'Unable save rows in BigQuery: {}'.format(err['message']))
 
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
index 1178672..cf99541 100644
--- a/sdks/python/apache_beam/testing/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -48,6 +48,8 @@
 
 MAX_RETRIES = 4
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class PipelineStateMatcher(BaseMatcher):
   """Matcher that verify pipeline job terminated in expected state
@@ -121,7 +123,7 @@
     if not matched_path:
       raise IOError('No such file or directory: %s' % self.file_path)
 
-    logging.info('Find %d files in %s: \n%s',
+    _LOGGER.info('Find %d files in %s: \n%s',
                  len(matched_path), self.file_path, '\n'.join(matched_path))
     for path in matched_path:
       with FileSystems.open(path, 'r') as f:
@@ -132,7 +134,7 @@
   def _matches(self, _):
     if self.sleep_secs:
       # Wait to have output file ready on FS
-      logging.info('Wait %d seconds...', self.sleep_secs)
+      _LOGGER.info('Wait %d seconds...', self.sleep_secs)
       time.sleep(self.sleep_secs)
 
     # Read from given file(s) path
@@ -140,7 +142,7 @@
 
     # Compute checksum
     self.checksum = utils.compute_hash(read_lines)
-    logging.info('Read from given path %s, %d lines, checksum: %s.',
+    _LOGGER.info('Read from given path %s, %d lines, checksum: %s.',
                  self.file_path, len(read_lines), self.checksum)
     return self.checksum == self.expected_checksum
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 06fd201..b12e9d6 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -101,6 +101,8 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class DoFnContext(object):
   """A context available to all methods of DoFn instance."""
@@ -502,7 +504,7 @@
       try:
         callback()
       except Exception as e:
-        logging.warning("Got exception from finalization call: %s", e)
+        _LOGGER.warning("Got exception from finalization call: %s", e)
 
   def has_callbacks(self):
     return len(self._callbacks) > 0
@@ -747,7 +749,7 @@
       type_hints = type_hints.strip_iterable()
     except ValueError as e:
       # TODO(BEAM-8466): Raise exception here if using stricter type checking.
-      logging.warning('%s: %s', self.display_data()['fn'].value, e)
+      _LOGGER.warning('%s: %s', self.display_data()['fn'].value, e)
     return type_hints
 
   def infer_output_type(self, input_type):
@@ -1218,7 +1220,7 @@
         key_coder = coders.registry.get_coder(typehints.Any)
 
       if not key_coder.is_deterministic():
-        logging.warning(
+        _LOGGER.warning(
             'Key coder %s for transform %s with stateful DoFn may not '
             'be deterministic. This may cause incorrect behavior for complex '
             'key types. Consider adding an input type hint for this transform.',
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 9761da2..e5bc20d 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -67,6 +67,9 @@
     ]
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class AccumulationMode(object):
   """Controls what to do with data when a trigger fires multiple times."""
   DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING
@@ -1190,7 +1193,7 @@
             window, self.NONSPECULATIVE_INDEX)
         state.add_state(window, self.NONSPECULATIVE_INDEX, 1)
         windowed_value.PaneInfoTiming.LATE
-        logging.warning('Watermark moved backwards in time '
+        _LOGGER.warning('Watermark moved backwards in time '
                         'or late data moved window end forward.')
     else:
       nonspeculative_index = state.get_state(window, self.NONSPECULATIVE_INDEX)
@@ -1320,7 +1323,7 @@
         elif time_domain == TimeDomain.WATERMARK:
           time_marker = watermark
         else:
-          logging.error(
+          _LOGGER.error(
               'TimeDomain error: No timers defined for time domain %s.',
               time_domain)
         if timestamp <= time_marker:
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 6062e6f..b64e020 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -96,6 +96,8 @@
 # to templated (upper-case) versions instead.
 DISALLOWED_PRIMITIVE_TYPES = (list, set, tuple, dict)
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class SimpleTypeHintError(TypeError):
   pass
@@ -1086,9 +1088,9 @@
     if isinstance(type_params, tuple) and len(type_params) == 3:
       yield_type, send_type, return_type = type_params
       if send_type is not None:
-        logging.warning('Ignoring send_type hint: %s' % send_type)
+        _LOGGER.warning('Ignoring send_type hint: %s' % send_type)
       if send_type is not None:
-        logging.warning('Ignoring return_type hint: %s' % return_type)
+        _LOGGER.warning('Ignoring return_type hint: %s' % return_type)
     else:
       yield_type = type_params
     return self.IteratorTypeConstraint(yield_type)
diff --git a/sdks/python/apache_beam/utils/profiler.py b/sdks/python/apache_beam/utils/profiler.py
index 0606744..c6f7295 100644
--- a/sdks/python/apache_beam/utils/profiler.py
+++ b/sdks/python/apache_beam/utils/profiler.py
@@ -36,6 +36,8 @@
 
 from apache_beam.io import filesystems
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class Profile(object):
   """cProfile wrapper context for saving and logging profiler results."""
@@ -53,14 +55,14 @@
     self.profile_output = None
 
   def __enter__(self):
-    logging.info('Start profiling: %s', self.profile_id)
+    _LOGGER.info('Start profiling: %s', self.profile_id)
     self.profile = cProfile.Profile()
     self.profile.enable()
     return self
 
   def __exit__(self, *args):
     self.profile.disable()
-    logging.info('Stop profiling: %s', self.profile_id)
+    _LOGGER.info('Stop profiling: %s', self.profile_id)
 
     if self.profile_location:
       dump_location = os.path.join(
@@ -70,7 +72,7 @@
       try:
         os.close(fd)
         self.profile.dump_stats(filename)
-        logging.info('Copying profiler data to: [%s]', dump_location)
+        _LOGGER.info('Copying profiler data to: [%s]', dump_location)
         self.file_copy_fn(filename, dump_location)
       finally:
         os.remove(filename)
@@ -81,7 +83,7 @@
       self.stats = pstats.Stats(
           self.profile, stream=s).sort_stats(Profile.SORTBY)
       self.stats.print_stats()
-      logging.info('Profiler data: [%s]', s.getvalue())
+      _LOGGER.info('Profiler data: [%s]', s.getvalue())
 
   @staticmethod
   def default_file_copy_fn(src, dest):
@@ -176,5 +178,5 @@
       return
     report_start_time = time.time()
     heap_profile = self._hpy().heap()
-    logging.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds',
+    _LOGGER.info('*** MemoryReport Heap:\n %s\n MemoryReport took %.1f seconds',
                  heap_profile, time.time() - report_start_time)
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index 59d8dec..e34e364 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -51,6 +51,9 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class PermanentException(Exception):
   """Base class for exceptions that should not be retried."""
   pass
@@ -153,7 +156,7 @@
 
 
 def with_exponential_backoff(
-    num_retries=7, initial_delay_secs=5.0, logger=logging.warning,
+    num_retries=7, initial_delay_secs=5.0, logger=_LOGGER.warning,
     retry_filter=retry_on_server_errors_filter,
     clock=Clock(), fuzz=True, factor=2, max_delay_secs=60 * 60):
   """Decorator with arguments that control the retry logic.
@@ -163,7 +166,7 @@
     initial_delay_secs: The delay before the first retry, in seconds.
     logger: A callable used to report an exception. Must have the same signature
       as functions in the standard logging module. The default is
-      logging.warning.
+      _LOGGER.warning.
     retry_filter: A callable getting the exception raised and returning True
       if the retry should happen. For instance we do not want to retry on
       404 Http errors most of the time. The default value will return true
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py
index 65dbcae..fd55f18 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -33,6 +33,8 @@
 
 from apache_beam.version import __version__ as beam_version
 
+_LOGGER = logging.getLogger(__name__)
+
 
 class SubprocessServer(object):
   """An abstract base class for running GRPC Servers as an external process.
@@ -78,7 +80,7 @@
         port, = pick_port(None)
         cmd = [arg.replace('{{PORT}}', str(port)) for arg in self._cmd]
       endpoint = 'localhost:%s' % port
-      logging.warning("Starting service with %s", str(cmd).replace("',", "'"))
+      _LOGGER.warning("Starting service with %s", str(cmd).replace("',", "'"))
       try:
         self._process = subprocess.Popen(cmd)
         wait_secs = .1
@@ -86,7 +88,7 @@
         channel_ready = grpc.channel_ready_future(channel)
         while True:
           if self._process.poll() is not None:
-            logging.error("Starting job service with %s", cmd)
+            _LOGGER.error("Starting job service with %s", cmd)
             raise RuntimeError(
                 'Service failed to start up with error %s' %
                 self._process.poll())
@@ -100,7 +102,7 @@
                         endpoint)
         return self._stub_class(channel)
       except:  # pylint: disable=bare-except
-        logging.exception("Error bringing up service")
+        _LOGGER.exception("Error bringing up service")
         self.stop()
         raise
 
@@ -169,7 +171,7 @@
             classifier='SNAPSHOT',
             appendix=appendix))
     if os.path.exists(local_path):
-      logging.info('Using pre-built snapshot at %s', local_path)
+      _LOGGER.info('Using pre-built snapshot at %s', local_path)
       return local_path
     elif '.dev' in beam_version:
       # TODO: Attempt to use nightly snapshots?
@@ -187,7 +189,7 @@
     if os.path.exists(url):
       return url
     else:
-      logging.warning('Downloading job server jar from %s' % url)
+      _LOGGER.warning('Downloading job server jar from %s' % url)
       cached_jar = os.path.join(cls.JAR_CACHE, os.path.basename(url))
       if not os.path.exists(cached_jar):
         if not os.path.exists(cls.JAR_CACHE):