[BEAM-8661] Moving io module to have per-module logger
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
index 3ff420f..76143dd 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -45,6 +45,9 @@
 __all__ = ['FileBasedSink']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class FileBasedSink(iobase.Sink):
   """A sink to a GCS or local files.
 
@@ -210,7 +213,7 @@
                       for file_metadata in mr.metadata_list]
 
     if dst_glob_files:
-      logging.warning('Deleting %d existing files in target path matching: %s',
+      _LOGGER.warning('Deleting %d existing files in target path matching: %s',
                       len(dst_glob_files), self.shard_name_glob_format)
       FileSystems.delete(dst_glob_files)
 
@@ -250,12 +253,12 @@
         raise BeamIOError('src and dst files do not exist. src: %s, dst: %s' % (
             src, dst))
       if not src_exists and dst_exists:
-        logging.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
+        _LOGGER.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
         num_skipped += 1
         continue
       if (src_exists and dst_exists and
           FileSystems.checksum(src) == FileSystems.checksum(dst)):
-        logging.debug('src: %s == dst: %s, deleting src', src, dst)
+        _LOGGER.debug('src: %s == dst: %s, deleting src', src, dst)
         delete_files.append(src)
         continue
 
@@ -284,7 +287,7 @@
                               for i in range(0, len(dst_files), chunk_size)]
 
     if num_shards_to_finalize:
-      logging.info(
+      _LOGGER.info(
           'Starting finalize_write threads with num_shards: %d (skipped: %d), '
           'batches: %d, num_threads: %d',
           num_shards_to_finalize, num_skipped, len(source_file_batch),
@@ -304,11 +307,11 @@
             raise
           for (src, dst), exception in iteritems(exp.exception_details):
             if exception:
-              logging.error(('Exception in _rename_batch. src: %s, '
+              _LOGGER.error(('Exception in _rename_batch. src: %s, '
                              'dst: %s, err: %s'), src, dst, exception)
               exceptions.append(exception)
             else:
-              logging.debug('Rename successful: %s -> %s', src, dst)
+              _LOGGER.debug('Rename successful: %s -> %s', src, dst)
           return exceptions
 
       exception_batches = util.run_using_threadpool(
@@ -324,10 +327,10 @@
       for final_name in dst_files:
         yield final_name
 
-      logging.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
+      _LOGGER.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
                    time.time() - start_time)
     else:
-      logging.warning(
+      _LOGGER.warning(
           'No shards found to finalize. num_shards: %d, skipped: %d',
           num_shards, num_skipped)
 
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index 07d0e8e..1cc3d74 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -44,6 +44,9 @@
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 # TODO: Refactor code so all io tests are using same library
 # TestCaseWithTempDirCleanup class.
 class _TestCaseWithTempDirCleanUp(unittest.TestCase):
@@ -247,7 +250,7 @@
           'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
           '/')
     except ValueError:
-      logging.debug('Ignoring test since GCP module is not installed')
+      _LOGGER.debug('Ignoring test since GCP module is not installed')
 
   @mock.patch('apache_beam.io.localfilesystem.os')
   def test_temp_dir_local(self, filesystem_os_mock):
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 1d9fcdd..14c35bc 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -114,6 +114,9 @@
            'ReadMatches']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class EmptyMatchTreatment(object):
   """How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms.
 
@@ -479,7 +482,7 @@
           str,
           filesystems.FileSystems.join(temp_location,
                                        '.temp%s' % dir_uid))
-      logging.info('Added temporary directory %s', self._temp_directory.get())
+      _LOGGER.info('Added temporary directory %s', self._temp_directory.get())
 
     output = (pcoll
               | beam.ParDo(_WriteUnshardedRecordsFn(
@@ -557,7 +560,7 @@
                                             '',
                                             destination)
 
-      logging.info('Moving temporary file %s to dir: %s as %s. Res: %s',
+      _LOGGER.info('Moving temporary file %s to dir: %s as %s. Res: %s',
                    r.file_name, self.path.get(), final_file_name, r)
 
       final_full_path = filesystems.FileSystems.join(self.path.get(),
@@ -570,7 +573,7 @@
       except BeamIOError:
         # This error is not serious, because it may happen on a retry of the
         # bundle. We simply log it.
-        logging.debug('File %s failed to be copied. This may be due to a bundle'
+        _LOGGER.debug('File %s failed to be copied. This may be due to a bundle'
                       ' being retried.', r.file_name)
 
       yield FileResult(final_file_name,
@@ -580,7 +583,7 @@
                        r.pane,
                        destination)
 
-    logging.info('Cautiously removing temporary files for'
+    _LOGGER.info('Cautiously removing temporary files for'
                  ' destination %s and window %s', destination, w)
     writer_key = (destination, w)
     self._remove_temporary_files(writer_key)
@@ -592,10 +595,10 @@
       match_result = filesystems.FileSystems.match(['%s*' % prefix])
       orphaned_files = [m.path for m in match_result[0].metadata_list]
 
-      logging.debug('Deleting orphaned files: %s', orphaned_files)
+      _LOGGER.debug('Deleting orphaned files: %s', orphaned_files)
       filesystems.FileSystems.delete(orphaned_files)
     except BeamIOError as e:
-      logging.debug('Exceptions when deleting files: %s', e)
+      _LOGGER.debug('Exceptions when deleting files: %s', e)
 
 
 class _WriteShardedRecordsFn(beam.DoFn):
@@ -625,7 +628,7 @@
     sink.flush()
     writer.close()
 
-    logging.info('Writing file %s for destination %s and shard %s',
+    _LOGGER.info('Writing file %s for destination %s and shard %s',
                  full_file_name, destination, repr(shard))
 
     yield FileResult(full_file_name,
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index 72e7f0d..569dd14 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -29,6 +29,9 @@
 from apache_beam.io import filesystemio
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class FakeDownloader(filesystemio.Downloader):
 
   def __init__(self, data):
@@ -206,7 +209,7 @@
 
     for buffer_size in buffer_sizes:
       for target in [self._read_and_verify, self._read_and_seek]:
-        logging.info('buffer_size=%s, target=%s' % (buffer_size, target))
+        _LOGGER.info('buffer_size=%s, target=%s' % (buffer_size, target))
         parent_conn, child_conn = multiprocessing.Pipe()
         stream = filesystemio.PipeStream(child_conn)
         success = [False]
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 21de828..d357946 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -45,6 +45,9 @@
 except ImportError:
   pass
 
+
+_LOGGER = logging.getLogger(__name__)
+
 WAIT_UNTIL_FINISH_DURATION_MS = 15 * 60 * 1000
 
 BIG_QUERY_DATASET_ID = 'python_query_to_table_'
@@ -90,7 +93,7 @@
     try:
       self.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      logging.debug('Failed to clean up dataset %s' % self.dataset_id)
+      _LOGGER.debug('Failed to clean up dataset %s' % self.dataset_id)
 
   def _setup_new_types_env(self):
     table_schema = bigquery.TableSchema()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d3ac5ca..0280c61 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -273,6 +273,9 @@
     ]
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 @deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
 def _parse_table_reference(table, dataset=None, project=None):
   return bigquery_tools.parse_table_reference(table, dataset, project)
@@ -787,7 +790,7 @@
       # and avoid the get-or-create step.
       return
 
-    logging.debug('Creating or getting table %s with schema %s.',
+    _LOGGER.debug('Creating or getting table %s with schema %s.',
                   table_reference, schema)
 
     table_schema = self.get_table_schema(schema)
@@ -833,7 +836,7 @@
     return self._flush_all_batches()
 
   def _flush_all_batches(self):
-    logging.debug('Attempting to flush to all destinations. Total buffered: %s',
+    _LOGGER.debug('Attempting to flush to all destinations. Total buffered: %s',
                   self._total_buffered_rows)
 
     return itertools.chain(*[self._flush_batch(destination)
@@ -850,7 +853,7 @@
       table_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '')
 
-    logging.debug('Flushing data to %s. Total %s rows.',
+    _LOGGER.debug('Flushing data to %s. Total %s rows.',
                   destination, len(rows_and_insert_ids))
 
     rows = [r[0] for r in rows_and_insert_ids]
@@ -865,7 +868,7 @@
           insert_ids=insert_ids,
           skip_invalid_rows=True)
 
-      logging.debug("Passed: %s. Errors are %s", passed, errors)
+      _LOGGER.debug("Passed: %s. Errors are %s", passed, errors)
       failed_rows = [rows[entry.index] for entry in errors]
       should_retry = any(
           bigquery_tools.RetryStrategy.should_retry(
@@ -877,7 +880,7 @@
         break
       else:
         retry_backoff = next(self._backoff_calculator)
-        logging.info('Sleeping %s seconds before retrying insertion.',
+        _LOGGER.info('Sleeping %s seconds before retrying insertion.',
                      retry_backoff)
         time.sleep(retry_backoff)
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index cb285ea..a6043b4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -46,6 +46,9 @@
 from apache_beam.transforms import trigger
 from apache_beam.transforms.window import GlobalWindows
 
+
+_LOGGER = logging.getLogger(__name__)
+
 ONE_TERABYTE = (1 << 40)
 
 # The maximum file size for imports is 5TB. We keep our files under that.
@@ -320,7 +323,7 @@
                                copy_to_reference.datasetId,
                                copy_to_reference.tableId)))
 
-    logging.info("Triggering copy job from %s to %s",
+    _LOGGER.info("Triggering copy job from %s to %s",
                  copy_from_reference, copy_to_reference)
     job_reference = self.bq_wrapper._insert_copy_job(
         copy_to_reference.projectId,
@@ -407,7 +410,7 @@
     uid = _bq_uuid()
     job_name = '%s_%s_%s' % (
         load_job_name_prefix, destination_hash, uid)
-    logging.debug('Load job has %s files. Job name is %s.',
+    _LOGGER.debug('Load job has %s files. Job name is %s.',
                   len(files), job_name)
 
     if self.temporary_tables:
@@ -415,7 +418,7 @@
       table_reference.tableId = job_name
       yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
 
-    logging.info('Triggering job %s to load data to BigQuery table %s.'
+    _LOGGER.info('Triggering job %s to load data to BigQuery table %s.'
                  'Schema: %s. Additional parameters: %s',
                  job_name, table_reference,
                  schema, additional_parameters)
@@ -519,9 +522,9 @@
                                     ref.jobId,
                                     ref.location)
 
-      logging.info("Job status: %s", job.status)
+      _LOGGER.info("Job status: %s", job.status)
       if job.status.state == 'DONE' and job.status.errorResult:
-        logging.warning("Job %s seems to have failed. Error Result: %s",
+        _LOGGER.warning("Job %s seems to have failed. Error Result: %s",
                         ref.jobId, job.status.errorResult)
         self._latest_error = job.status
         return WaitForBQJobs.FAILED
@@ -541,7 +544,7 @@
     self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
 
   def process(self, table_reference):
-    logging.info("Deleting table %s", table_reference)
+    _LOGGER.info("Deleting table %s", table_reference)
     table_reference = bigquery_tools.parse_table_reference(table_reference)
     self.bq_wrapper._delete_table(
         table_reference.projectId,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 035be18..6a61cc2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -58,6 +58,9 @@
   HttpError = None
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 _DESTINATION_ELEMENT_PAIRS = [
     # DESTINATION 1
     ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
@@ -609,7 +612,7 @@
     self.bigquery_client = bigquery_tools.BigQueryWrapper()
     self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
     self.output_table = "%s.output_table" % (self.dataset_id)
-    logging.info("Created dataset %s in project %s",
+    _LOGGER.info("Created dataset %s in project %s",
                  self.dataset_id, self.project)
 
   @attr('IT')
@@ -794,11 +797,11 @@
         projectId=self.project, datasetId=self.dataset_id,
         deleteContents=True)
     try:
-      logging.info("Deleting dataset %s in project %s",
+      _LOGGER.info("Deleting dataset %s in project %s",
                    self.dataset_id, self.project)
       self.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      logging.debug('Failed to clean up dataset %s in project %s',
+      _LOGGER.debug('Failed to clean up dataset %s in project %s',
                     self.dataset_id, self.project)
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 246d2ce..ff63eda 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -46,6 +46,9 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class BigQueryReadIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_read_table_'
 
@@ -59,7 +62,7 @@
                                   str(int(time.time())),
                                   random.randint(0, 10000))
     self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
-    logging.info("Created dataset %s in project %s",
+    _LOGGER.info("Created dataset %s in project %s",
                  self.dataset_id, self.project)
 
   def tearDown(self):
@@ -67,11 +70,11 @@
         projectId=self.project, datasetId=self.dataset_id,
         deleteContents=True)
     try:
-      logging.info("Deleting dataset %s in project %s",
+      _LOGGER.info("Deleting dataset %s in project %s",
                    self.dataset_id, self.project)
       self.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      logging.debug('Failed to clean up dataset %s in project %s',
+      _LOGGER.debug('Failed to clean up dataset %s in project %s',
                     self.dataset_id, self.project)
 
   def create_table(self, tablename):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b8c8c1c..6cf4529 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -71,6 +71,9 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestTableRowJsonCoder(unittest.TestCase):
 
@@ -579,7 +582,7 @@
     self.bigquery_client = bigquery_tools.BigQueryWrapper()
     self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
     self.output_table = "%s.output_table" % (self.dataset_id)
-    logging.info("Created dataset %s in project %s",
+    _LOGGER.info("Created dataset %s in project %s",
                  self.dataset_id, self.project)
 
   @attr('IT')
@@ -741,11 +744,11 @@
         projectId=self.project, datasetId=self.dataset_id,
         deleteContents=True)
     try:
-      logging.info("Deleting dataset %s in project %s",
+      _LOGGER.info("Deleting dataset %s in project %s",
                    self.dataset_id, self.project)
       self.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      logging.debug('Failed to clean up dataset %s in project %s',
+      _LOGGER.debug('Failed to clean up dataset %s in project %s',
                     self.dataset_id, self.project)
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 0649703..8510294 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -62,6 +62,9 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 MAX_RETRIES = 3
 
 
@@ -262,7 +265,7 @@
 
     if response.statistics is None:
       # This behavior is only expected in tests
-      logging.warning(
+      _LOGGER.warning(
           "Unable to get location, missing response.statistics. Query: %s",
           query)
       return None
@@ -274,11 +277,11 @@
           table.projectId,
           table.datasetId,
           table.tableId)
-      logging.info("Using location %r from table %r referenced by query %s",
+      _LOGGER.info("Using location %r from table %r referenced by query %s",
                    location, table, query)
       return location
 
-    logging.debug("Query %s does not reference any tables.", query)
+    _LOGGER.debug("Query %s does not reference any tables.", query)
     return None
 
   @retry.with_exponential_backoff(
@@ -309,9 +312,9 @@
         )
     )
 
-    logging.info("Inserting job request: %s", request)
+    _LOGGER.info("Inserting job request: %s", request)
     response = self.client.jobs.Insert(request)
-    logging.info("Response was %s", response)
+    _LOGGER.info("Response was %s", response)
     return response.jobReference
 
   @retry.with_exponential_backoff(
@@ -442,7 +445,7 @@
     request = bigquery.BigqueryTablesInsertRequest(
         projectId=project_id, datasetId=dataset_id, table=table)
     response = self.client.tables.Insert(request)
-    logging.debug("Created the table with id %s", table_id)
+    _LOGGER.debug("Created the table with id %s", table_id)
     # The response is a bigquery.Table instance.
     return response
 
@@ -491,7 +494,7 @@
       self.client.tables.Delete(request)
     except HttpError as exn:
       if exn.status_code == 404:
-        logging.warning('Table %s:%s.%s does not exist', project_id,
+        _LOGGER.warning('Table %s:%s.%s does not exist', project_id,
                         dataset_id, table_id)
         return
       else:
@@ -508,7 +511,7 @@
       self.client.datasets.Delete(request)
     except HttpError as exn:
       if exn.status_code == 404:
-        logging.warning('Dataset %s:%s does not exist', project_id,
+        _LOGGER.warning('Dataset %s:%s does not exist', project_id,
                         dataset_id)
         return
       else:
@@ -537,7 +540,7 @@
             % (project_id, dataset_id))
     except HttpError as exn:
       if exn.status_code == 404:
-        logging.warning(
+        _LOGGER.warning(
             'Dataset %s:%s does not exist so we will create it as temporary '
             'with location=%s',
             project_id, dataset_id, location)
@@ -555,7 +558,7 @@
           projectId=project_id, datasetId=temp_table.datasetId))
     except HttpError as exn:
       if exn.status_code == 404:
-        logging.warning('Dataset %s:%s does not exist', project_id,
+        _LOGGER.warning('Dataset %s:%s does not exist', project_id,
                         temp_table.datasetId)
         return
       else:
@@ -669,12 +672,12 @@
             additional_parameters=additional_create_parameters)
       except HttpError as exn:
         if exn.status_code == 409:
-          logging.debug('Skipping Creation. Table %s:%s.%s already exists.'
+          _LOGGER.debug('Skipping Creation. Table %s:%s.%s already exists.'
                         % (project_id, dataset_id, table_id))
           created_table = self.get_table(project_id, dataset_id, table_id)
         else:
           raise
-      logging.info('Created table %s.%s.%s with schema %s. '
+      _LOGGER.info('Created table %s.%s.%s with schema %s. '
                    'Result: %s.',
                    project_id, dataset_id, table_id,
                    schema or found_table.schema,
@@ -684,7 +687,7 @@
       if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
         # BigQuery can route data to the old table for 2 mins max so wait
         # that much time before creating the table and writing it
-        logging.warning('Sleeping for 150 seconds before the write as ' +
+        _LOGGER.warning('Sleeping for 150 seconds before the write as ' +
                         'BigQuery inserts can be routed to deleted table ' +
                         'for 2 mins after the delete and create.')
         # TODO(BEAM-2673): Remove this sleep by migrating to load api
@@ -713,7 +716,7 @@
         # request not for the actual execution of the query in the service.  If
         # the request times out we keep trying. This situation is quite possible
         # if the query will return a large number of rows.
-        logging.info('Waiting on response from query: %s ...', query)
+        _LOGGER.info('Waiting on response from query: %s ...', query)
         time.sleep(1.0)
         continue
       # We got some results. The last page is signalled by a missing pageToken.
@@ -975,7 +978,7 @@
 
   def _flush_rows_buffer(self):
     if self.rows_buffer:
-      logging.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
+      _LOGGER.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
                    self.project_id, self.dataset_id, self.table_id)
       passed, errors = self.client.insert_rows(
           project_id=self.project_id, dataset_id=self.dataset_id,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 3658b9c..ae56e35 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -48,6 +48,9 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class BigQueryWriteIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_write_to_table_'
 
@@ -61,7 +64,7 @@
                                   str(int(time.time())),
                                   random.randint(0, 10000))
     self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
-    logging.info("Created dataset %s in project %s",
+    _LOGGER.info("Created dataset %s in project %s",
                  self.dataset_id, self.project)
 
   def tearDown(self):
@@ -69,11 +72,11 @@
         projectId=self.project, datasetId=self.dataset_id,
         deleteContents=True)
     try:
-      logging.info("Deleting dataset %s in project %s",
+      _LOGGER.info("Deleting dataset %s in project %s",
                    self.dataset_id, self.project)
       self.bigquery_client.client.datasets.Delete(request)
     except HttpError:
-      logging.debug('Failed to clean up dataset %s in project %s',
+      _LOGGER.debug('Failed to clean up dataset %s in project %s',
                     self.dataset_id, self.project)
 
   def create_table(self, table_name):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 3d32611..a6fc816 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -45,12 +45,16 @@
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.util import Values
 
+
+_LOGGER = logging.getLogger(__name__)
+
+
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud.proto.datastore.v1 import datastore_pb2
   from googledatastore import helper as datastore_helper
-  logging.warning(
+  _LOGGER.warning(
       'Using deprecated Datastore client.\n'
       'This client will be removed in Beam 3.0 (next Beam major release).\n'
       'Please migrate to apache_beam.io.gcp.datastore.v1new.datastoreio.')
@@ -125,7 +129,7 @@
           'Google Cloud IO not available, '
           'please install apache_beam[gcp]')
 
-    logging.warning('datastoreio read transform is experimental.')
+    _LOGGER.warning('datastoreio read transform is experimental.')
     super(ReadFromDatastore, self).__init__()
 
     if not project:
@@ -213,13 +217,13 @@
       else:
         estimated_num_splits = self._num_splits
 
-      logging.info("Splitting the query into %d splits", estimated_num_splits)
+      _LOGGER.info("Splitting the query into %d splits", estimated_num_splits)
       try:
         query_splits = query_splitter.get_splits(
             self._datastore, query, estimated_num_splits,
             helper.make_partition(self._project, self._datastore_namespace))
       except Exception:
-        logging.warning("Unable to parallelize the given query: %s", query,
+        _LOGGER.warning("Unable to parallelize the given query: %s", query,
                         exc_info=True)
         query_splits = [query]
 
@@ -296,7 +300,7 @@
     kind = query.kind[0].name
     latest_timestamp = ReadFromDatastore.query_latest_statistics_timestamp(
         project, namespace, datastore)
-    logging.info('Latest stats timestamp for kind %s is %s',
+    _LOGGER.info('Latest stats timestamp for kind %s is %s',
                  kind, latest_timestamp)
 
     kind_stats_query = (
@@ -316,13 +320,13 @@
     try:
       estimated_size_bytes = ReadFromDatastore.get_estimated_size_bytes(
           project, namespace, query, datastore)
-      logging.info('Estimated size bytes for query: %s', estimated_size_bytes)
+      _LOGGER.info('Estimated size bytes for query: %s', estimated_size_bytes)
       num_splits = int(min(ReadFromDatastore._NUM_QUERY_SPLITS_MAX, round(
           (float(estimated_size_bytes) /
            ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
 
     except Exception as e:
-      logging.warning('Failed to fetch estimated size bytes: %s', e)
+      _LOGGER.warning('Failed to fetch estimated size bytes: %s', e)
       # Fallback in case estimated size is unavailable.
       num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
 
@@ -346,7 +350,7 @@
      """
     self._project = project
     self._mutation_fn = mutation_fn
-    logging.warning('datastoreio write transform is experimental.')
+    _LOGGER.warning('datastoreio write transform is experimental.')
 
   def expand(self, pcoll):
     return (pcoll
@@ -424,7 +428,7 @@
           self._datastore, self._project, self._mutations,
           self._throttler, self._update_rpc_stats,
           throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS//1000)
-      logging.debug("Successfully wrote %d mutations in %dms.",
+      _LOGGER.debug("Successfully wrote %d mutations in %dms.",
                     len(self._mutations), latency_ms)
 
       if not self._fixed_batch_size:
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index a2bc521..4ea2898 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -54,6 +54,9 @@
 # pylint: enable=ungrouped-imports
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def key_comparator(k1, k2):
   """A comparator for Datastore keys.
 
@@ -216,7 +219,7 @@
   def commit(request):
     # Client-side throttling.
     while throttler.throttle_request(time.time()*1000):
-      logging.info("Delaying request for %ds due to previous failures",
+      _LOGGER.info("Delaying request for %ds due to previous failures",
                    throttle_delay)
       time.sleep(throttle_delay)
       rpc_stats_callback(throttled_secs=throttle_delay)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
index f5b1157..21d7781 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
@@ -48,6 +48,9 @@
 from apache_beam.testing.util import equal_to
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def new_pipeline_with_job_name(pipeline_options, job_name, suffix):
   """Create a pipeline with the given job_name and a suffix."""
   gcp_options = pipeline_options.view_as(GoogleCloudOptions)
@@ -108,7 +111,7 @@
   # Pipeline 1: Create and write the specified number of Entities to the
   # Cloud Datastore.
   ancestor_key = Key([kind, str(uuid.uuid4())], project=project)
-  logging.info('Writing %s entities to %s', num_entities, project)
+  _LOGGER.info('Writing %s entities to %s', num_entities, project)
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-write')
   _ = (p
        | 'Input' >> beam.Create(list(range(num_entities)))
@@ -121,7 +124,7 @@
   # Optional Pipeline 2: If a read limit was provided, read it and confirm
   # that the expected entities were read.
   if known_args.limit is not None:
-    logging.info('Querying a limited set of %s entities and verifying count.',
+    _LOGGER.info('Querying a limited set of %s entities and verifying count.',
                  known_args.limit)
     p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-limit')
     query.limit = known_args.limit
@@ -134,7 +137,7 @@
     query.limit = None
 
   # Pipeline 3: Query the written Entities and verify result.
-  logging.info('Querying entities, asserting they match.')
+  _LOGGER.info('Querying entities, asserting they match.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify')
   entities = p | 'read from datastore' >> ReadFromDatastore(query)
 
@@ -145,7 +148,7 @@
   p.run()
 
   # Pipeline 4: Delete Entities.
-  logging.info('Deleting entities.')
+  _LOGGER.info('Deleting entities.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-delete')
   entities = p | 'read from datastore' >> ReadFromDatastore(query)
   _ = (entities
@@ -155,7 +158,7 @@
   p.run()
 
   # Pipeline 5: Query the written Entities, verify no results.
-  logging.info('Querying for the entities to make sure there are none present.')
+  _LOGGER.info('Querying for the entities to make sure there are none present.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-deleted')
   entities = p | 'read from datastore' >> ReadFromDatastore(query)
 
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
index f71a801..a70ea95 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
@@ -51,6 +51,9 @@
 __all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 @typehints.with_output_types(types.Entity)
 class ReadFromDatastore(PTransform):
   """A ``PTransform`` for querying Google Cloud Datastore.
@@ -173,11 +176,11 @@
         else:
           estimated_num_splits = self._num_splits
 
-        logging.info("Splitting the query into %d splits", estimated_num_splits)
+        _LOGGER.info("Splitting the query into %d splits", estimated_num_splits)
         query_splits = query_splitter.get_splits(
             client, query, estimated_num_splits)
       except query_splitter.QuerySplitterError:
-        logging.info("Unable to parallelize the given query: %s", query,
+        _LOGGER.info("Unable to parallelize the given query: %s", query,
                      exc_info=True)
         query_splits = [query]
 
@@ -219,7 +222,7 @@
       latest_timestamp = (
           ReadFromDatastore._SplitQueryFn
           .query_latest_statistics_timestamp(client))
-      logging.info('Latest stats timestamp for kind %s is %s',
+      _LOGGER.info('Latest stats timestamp for kind %s is %s',
                    kind_name, latest_timestamp)
 
       if client.namespace is None:
@@ -243,12 +246,12 @@
         estimated_size_bytes = (
             ReadFromDatastore._SplitQueryFn
             .get_estimated_size_bytes(client, query))
-        logging.info('Estimated size bytes for query: %s', estimated_size_bytes)
+        _LOGGER.info('Estimated size bytes for query: %s', estimated_size_bytes)
         num_splits = int(min(ReadFromDatastore._NUM_QUERY_SPLITS_MAX, round(
             (float(estimated_size_bytes) /
              ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
       except Exception as e:
-        logging.warning('Failed to fetch estimated size bytes: %s', e)
+        _LOGGER.warning('Failed to fetch estimated size bytes: %s', e)
         # Fallback in case estimated size is unavailable.
         num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
 
@@ -360,7 +363,7 @@
       """
       # Client-side throttling.
       while throttler.throttle_request(time.time() * 1000):
-        logging.info("Delaying request for %ds due to previous failures",
+        _LOGGER.info("Delaying request for %ds due to previous failures",
                      throttle_delay)
         time.sleep(throttle_delay)
         rpc_stats_callback(throttled_secs=throttle_delay)
@@ -412,7 +415,7 @@
           self._throttler,
           rpc_stats_callback=self._update_rpc_stats,
           throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS // 1000)
-      logging.debug("Successfully wrote %d mutations in %dms.",
+      _LOGGER.debug("Successfully wrote %d mutations in %dms.",
                     len(self._batch.mutations), latency_ms)
 
       now = time.time() * 1000
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
index 21633d9..c3bf8ef 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
@@ -40,6 +40,9 @@
   client = None
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 @unittest.skipIf(client is None, 'Datastore dependencies are not installed')
 class TypesTest(unittest.TestCase):
   _PROJECT = 'project'
@@ -168,7 +171,7 @@
     self.assertEqual(order, cq.order)
     self.assertEqual(distinct_on, cq.distinct_on)
 
-    logging.info('query: %s', q)  # Test __repr__()
+    _LOGGER.info('query: %s', q)  # Test __repr__()
 
   def testValueProviderFilters(self):
     self.vp_filters = [
@@ -193,7 +196,7 @@
       cq = q._to_client_query(self._test_client)
       self.assertEqual(exp_filter, cq.filters)
 
-      logging.info('query: %s', q)  # Test __repr__()
+      _LOGGER.info('query: %s', q)  # Test __repr__()
 
   def testQueryEmptyNamespace(self):
     # Test that we can pass a namespace of None.
diff --git a/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py b/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
index 67e375f..2d0be8f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
@@ -57,6 +57,9 @@
 # pylint: enable=ungrouped-imports
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def new_pipeline_with_job_name(pipeline_options, job_name, suffix):
   """Create a pipeline with the given job_name and a suffix."""
   gcp_options = pipeline_options.view_as(GoogleCloudOptions)
@@ -137,7 +140,7 @@
 
   # Pipeline 1: Create and write the specified number of Entities to the
   # Cloud Datastore.
-  logging.info('Writing %s entities to %s', num_entities, project)
+  _LOGGER.info('Writing %s entities to %s', num_entities, project)
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-write')
 
   # pylint: disable=expression-not-assigned
@@ -152,7 +155,7 @@
   # Optional Pipeline 2: If a read limit was provided, read it and confirm
   # that the expected entities were read.
   if known_args.limit is not None:
-    logging.info('Querying a limited set of %s entities and verifying count.',
+    _LOGGER.info('Querying a limited set of %s entities and verifying count.',
                  known_args.limit)
     p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-limit')
     query_with_limit = query_pb2.Query()
@@ -167,7 +170,7 @@
     p.run()
 
   # Pipeline 3: Query the written Entities and verify result.
-  logging.info('Querying entities, asserting they match.')
+  _LOGGER.info('Querying entities, asserting they match.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify')
   entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
 
@@ -178,7 +181,7 @@
   p.run()
 
   # Pipeline 4: Delete Entities.
-  logging.info('Deleting entities.')
+  _LOGGER.info('Deleting entities.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-delete')
   entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
   # pylint: disable=expression-not-assigned
@@ -189,7 +192,7 @@
   p.run()
 
   # Pipeline 5: Query the written Entities, verify no results.
-  logging.info('Querying for the entities to make sure there are none present.')
+  _LOGGER.info('Querying for the entities to make sure there are none present.')
   p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-deleted')
   entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
 
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index dfdc29d..c1e0314 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -44,6 +44,9 @@
 __all__ = ['GcsIO']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 # Issue a friendlier error message if the storage library is not available.
 # TODO(silviuc): Remove this guard when storage is available everywhere.
 try:
@@ -250,7 +253,7 @@
         maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
     response = self.client.objects.Rewrite(request)
     while not response.done:
-      logging.debug(
+      _LOGGER.debug(
           'Rewrite progress: %d of %d bytes, %s to %s',
           response.totalBytesRewritten, response.objectSize, src, dest)
       request.rewriteToken = response.rewriteToken
@@ -258,7 +261,7 @@
       if self._rewrite_cb is not None:
         self._rewrite_cb(response)
 
-    logging.debug('Rewrite done: %s to %s', src, dest)
+    _LOGGER.debug('Rewrite done: %s to %s', src, dest)
 
   # We intentionally do not decorate this method with a retry, as retrying is
   # handled in BatchApiRequest.Execute().
@@ -320,12 +323,12 @@
                 GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
           pair_to_status[pair] = exception
         elif not response.done:
-          logging.debug(
+          _LOGGER.debug(
               'Rewrite progress: %d of %d bytes, %s to %s',
               response.totalBytesRewritten, response.objectSize, src, dest)
           pair_to_request[pair].rewriteToken = response.rewriteToken
         else:
-          logging.debug('Rewrite done: %s to %s', src, dest)
+          _LOGGER.debug('Rewrite done: %s to %s', src, dest)
           pair_to_status[pair] = None
 
     return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
@@ -458,7 +461,7 @@
     file_sizes = {}
     counter = 0
     start_time = time.time()
-    logging.info("Starting the size estimation of the input")
+    _LOGGER.info("Starting the size estimation of the input")
     while True:
       response = self.client.objects.List(request)
       for item in response.items:
@@ -466,12 +469,12 @@
         file_sizes[file_name] = item.size
         counter += 1
         if counter % 10000 == 0:
-          logging.info("Finished computing size of: %s files", len(file_sizes))
+          _LOGGER.info("Finished computing size of: %s files", len(file_sizes))
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
       else:
         break
-    logging.info("Finished listing %s files in %s seconds.",
+    _LOGGER.info("Finished listing %s files in %s seconds.",
                  counter, time.time() - start_time)
     return file_sizes
 
@@ -492,7 +495,7 @@
       if http_error.status_code == 404:
         raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
       else:
-        logging.error('HTTP error while requesting file %s: %s', self._path,
+        _LOGGER.error('HTTP error while requesting file %s: %s', self._path,
                       http_error)
         raise
     self._size = metadata.size
@@ -564,7 +567,7 @@
     try:
       self._client.objects.Insert(self._insert_request, upload=self._upload)
     except Exception as e:  # pylint: disable=broad-except
-      logging.error('Error in _start_upload while inserting file %s: %s',
+      _LOGGER.error('Error in _start_upload while inserting file %s: %s',
                     self._path, traceback.format_exc())
       self._upload_thread.last_error = e
     finally:
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
index a5fc749..9664348 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
@@ -27,6 +27,9 @@
 from apitools.base.py import util
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class GcsIOOverrides(object):
   """Functions for overriding Google Cloud Storage I/O client."""
 
@@ -37,13 +40,13 @@
     # handling GCS download throttling errors (BEAM-7424)
     if (isinstance(retry_args.exc, exceptions.BadStatusCodeError) and
         retry_args.exc.status_code == http_wrapper.TOO_MANY_REQUESTS):
-      logging.debug(
+      _LOGGER.debug(
           'Caught GCS quota error (%s), retrying.', retry_args.exc.status_code)
     else:
       return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
 
     http_wrapper.RebuildHttpConnections(retry_args.http)
-    logging.debug('Retrying request to url %s after exception %s',
+    _LOGGER.debug('Retrying request to url %s after exception %s',
                   retry_args.http_request.url, retry_args.exc)
     sleep_seconds = util.CalculateWaitForRetry(
         retry_args.num_retries, max_wait=retry_args.max_retry_wait)
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index c3394a1..3ec3999 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -46,6 +46,9 @@
 MAX_RETRIES = 5
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def retry_on_http_and_value_error(exception):
   """Filter allowing retries on Bigquery errors and value error."""
   return isinstance(exception, (GoogleCloudError, ValueError))
@@ -83,10 +86,10 @@
   def _matches(self, _):
     if self.checksum is None:
       response = self._query_with_retry()
-      logging.info('Read from given query (%s), total rows %d',
+      _LOGGER.info('Read from given query (%s), total rows %d',
                    self.query, len(response))
       self.checksum = compute_hash(response)
-      logging.info('Generate checksum: %s', self.checksum)
+      _LOGGER.info('Generate checksum: %s', self.checksum)
 
     return self.checksum == self.expected_checksum
 
@@ -95,7 +98,7 @@
       retry_filter=retry_on_http_and_value_error)
   def _query_with_retry(self):
     """Run Bigquery query with retry if got error http response"""
-    logging.info('Attempting to perform query %s to BQ', self.query)
+    _LOGGER.info('Attempting to perform query %s to BQ', self.query)
     # Create client here since it throws an exception if pickled.
     bigquery_client = bigquery.Client(self.project)
     query_job = bigquery_client.query(self.query)
@@ -134,7 +137,7 @@
   def _matches(self, _):
     if self.actual_data is None:
       self.actual_data = self._get_query_result()
-      logging.info('Result of query is: %r', self.actual_data)
+      _LOGGER.info('Result of query is: %r', self.actual_data)
 
     try:
       equal_to(self.expected_data)(self.actual_data)
@@ -179,7 +182,7 @@
       response = self._query_with_retry()
       if len(response) >= len(self.expected_data):
         return response
-      logging.debug('Query result contains %d rows' % len(response))
+      _LOGGER.debug('Query result contains %d rows' % len(response))
       time.sleep(1)
     if sys.version_info >= (3,):
       raise TimeoutError('Timeout exceeded for matcher.') # noqa: F821
@@ -207,13 +210,13 @@
     return bigquery_wrapper.get_table(self.project, self.dataset, self.table)
 
   def _matches(self, _):
-    logging.info('Start verify Bigquery table properties.')
+    _LOGGER.info('Start verify Bigquery table properties.')
     # Run query
     bigquery_wrapper = bigquery_tools.BigQueryWrapper()
 
     self.actual_table = self._get_table_with_retry(bigquery_wrapper)
 
-    logging.info('Table proto is %s', self.actual_table)
+    _LOGGER.info('Table proto is %s', self.actual_table)
 
     return all(
         self._match_property(v, self._get_or_none(self.actual_table, k))
@@ -231,7 +234,7 @@
 
   @staticmethod
   def _match_property(expected, actual):
-    logging.info("Matching %s to %s", expected, actual)
+    _LOGGER.info("Matching %s to %s", expected, actual)
     if isinstance(expected, dict):
       return all(
           BigQueryTableMatcher._match_property(
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 7a0b5c8..aaa86f6 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -40,6 +40,9 @@
 MAX_MESSAGES_IN_ONE_PULL = 50
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class PubSubMessageMatcher(BaseMatcher):
   """Matcher that verifies messages from given subscription.
 
@@ -123,7 +126,7 @@
       time.sleep(1)
 
     if time.time() - start_time > timeout:
-      logging.error('Timeout after %d sec. Received %d messages from %s.',
+      _LOGGER.error('Timeout after %d sec. Received %d messages from %s.',
                     timeout, len(total_messages), self.sub_name)
     return total_messages
 
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py
index 4ed9af3..dbf8ac9 100644
--- a/sdks/python/apache_beam/io/gcp/tests/utils.py
+++ b/sdks/python/apache_beam/io/gcp/tests/utils.py
@@ -37,6 +37,9 @@
   bigquery = None
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class GcpTestIOError(retry.PermanentException):
   """Basic GCP IO error for testing. Function that raises this error should
   not be retried."""
@@ -93,7 +96,7 @@
     dataset_id: Name of the dataset where table is.
     table_id: Name of the table.
   """
-  logging.info('Clean up a BigQuery table with project: %s, dataset: %s, '
+  _LOGGER.info('Clean up a BigQuery table with project: %s, dataset: %s, '
                'table: %s.', project, dataset_id, table_id)
   client = bigquery.Client(project=project)
   table_ref = client.dataset(dataset_id).table(table_id)
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 71d74e8..1d7d5cc 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -56,6 +56,9 @@
 _FILE_STATUS_TYPE_FILE = 'FILE'
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class HdfsDownloader(filesystemio.Downloader):
 
   def __init__(self, hdfs_client, path):
@@ -196,7 +199,7 @@
   @staticmethod
   def _add_compression(stream, path, mime_type, compression_type):
     if mime_type != 'application/octet-stream':
-      logging.warning('Mime types are not supported. Got non-default mime_type:'
+      _LOGGER.warning('Mime types are not supported. Got non-default mime_type:'
                       ' %s', mime_type)
     if compression_type == CompressionTypes.AUTO:
       compression_type = CompressionTypes.detect_compression_type(path)
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e21052f..dd97b7f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -61,6 +61,9 @@
            'Sink', 'Write', 'Writer']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 # Encapsulates information about a bundle of a source generated when method
 # BoundedSource.split() is invoked.
 # This is a named 4-tuple that has following fields.
@@ -1075,7 +1078,7 @@
   write_results = list(write_results)
   extra_shards = []
   if len(write_results) < min_shards:
-    logging.debug(
+    _LOGGER.debug(
         'Creating %s empty shard(s).', min_shards - len(write_results))
     for _ in range(min_shards - len(write_results)):
       writer = sink.open_writer(init_result, str(uuid.uuid4()))
diff --git a/sdks/python/apache_beam/io/mongodbio.py b/sdks/python/apache_beam/io/mongodbio.py
index 6004ca1..5abab237 100644
--- a/sdks/python/apache_beam/io/mongodbio.py
+++ b/sdks/python/apache_beam/io/mongodbio.py
@@ -66,6 +66,10 @@
 from apache_beam.transforms import Reshuffle
 from apache_beam.utils.annotations import experimental
 
+
+_LOGGER = logging.getLogger(__name__)
+
+
 try:
   # Mongodb has its own bundled bson, which is not compatible with bson pakcage.
   # (https://github.com/py-bson/bson/issues/82). Try to import objectid and if
@@ -80,7 +84,7 @@
   from pymongo import ReplaceOne
 except ImportError:
   objectid = None
-  logging.warning("Could not find a compatible bson package.")
+  _LOGGER.warning("Could not find a compatible bson package.")
 
 __all__ = ['ReadFromMongoDB', 'WriteToMongoDB']
 
@@ -497,7 +501,7 @@
                      replacement=doc,
                      upsert=True))
     resp = self.client[self.db][self.coll].bulk_write(requests)
-    logging.debug('BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, '
+    _LOGGER.debug('BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, '
                   'nMatched:%d, Errors:%s' %
                   (resp.modified_count, resp.upserted_count, resp.matched_count,
                    resp.bulk_api_result.get('writeErrors')))
diff --git a/sdks/python/apache_beam/io/mongodbio_it_test.py b/sdks/python/apache_beam/io/mongodbio_it_test.py
index bfc6099..7189ac6 100644
--- a/sdks/python/apache_beam/io/mongodbio_it_test.py
+++ b/sdks/python/apache_beam/io/mongodbio_it_test.py
@@ -28,6 +28,9 @@
 from apache_beam.testing.util import equal_to
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def run(argv=None):
   default_db = 'beam_mongodbio_it_db'
   default_coll = 'integration_test_%d' % time.time()
@@ -54,7 +57,7 @@
   # Test Write to MongoDB
   with TestPipeline(options=PipelineOptions(pipeline_args)) as p:
     start_time = time.time()
-    logging.info('Writing %d documents to mongodb' % known_args.num_documents)
+    _LOGGER.info('Writing %d documents to mongodb' % known_args.num_documents)
     docs = [{
         'number': x,
         'number_mod_2': x % 2,
@@ -67,13 +70,13 @@
                                                        known_args.mongo_coll,
                                                        known_args.batch_size)
   elapsed = time.time() - start_time
-  logging.info('Writing %d documents to mongodb finished in %.3f seconds' %
+  _LOGGER.info('Writing %d documents to mongodb finished in %.3f seconds' %
                (known_args.num_documents, elapsed))
 
   # Test Read from MongoDB
   with TestPipeline(options=PipelineOptions(pipeline_args)) as p:
     start_time = time.time()
-    logging.info('Reading from mongodb %s:%s' %
+    _LOGGER.info('Reading from mongodb %s:%s' %
                  (known_args.mongo_db, known_args.mongo_coll))
     r = p | 'ReadFromMongoDB' >> \
                 beam.io.ReadFromMongoDB(known_args.mongo_uri,
@@ -85,7 +88,7 @@
         r, equal_to([number for number in range(known_args.num_documents)]))
 
   elapsed = time.time() - start_time
-  logging.info('Read %d documents from mongodb finished in %.3f seconds' %
+  _LOGGER.info('Read %d documents from mongodb finished in %.3f seconds' %
                (known_args.num_documents, elapsed))
 
 
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index c46f801..d4845fb 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -34,6 +34,9 @@
            'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class OffsetRangeTracker(iobase.RangeTracker):
   """A 'RangeTracker' for non-negative positions of type 'long'."""
 
@@ -137,27 +140,27 @@
     assert isinstance(split_offset, (int, long))
     with self._lock:
       if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
-        logging.debug('refusing to split %r at %d: stop position unspecified',
+        _LOGGER.debug('refusing to split %r at %d: stop position unspecified',
                       self, split_offset)
         return
       if self._last_record_start == -1:
-        logging.debug('Refusing to split %r at %d: unstarted', self,
+        _LOGGER.debug('Refusing to split %r at %d: unstarted', self,
                       split_offset)
         return
 
       if split_offset <= self._last_record_start:
-        logging.debug(
+        _LOGGER.debug(
             'Refusing to split %r at %d: already past proposed stop offset',
             self, split_offset)
         return
       if (split_offset < self.start_position()
           or split_offset >= self.stop_position()):
-        logging.debug(
+        _LOGGER.debug(
             'Refusing to split %r at %d: proposed split position out of range',
             self, split_offset)
         return
 
-      logging.debug('Agreeing to split %r at %d', self, split_offset)
+      _LOGGER.debug('Agreeing to split %r at %d', self, split_offset)
 
       split_fraction = (float(split_offset - self._start_offset) / (
           self._stop_offset - self._start_offset))
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index d83a62a..7291786 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -68,6 +68,9 @@
            'assert_split_at_fraction_succeeds_and_consistent']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class ExpectedSplitOutcome(object):
   MUST_SUCCEED_AND_BE_CONSISTENT = 1
   MUST_FAIL = 2
@@ -588,7 +591,7 @@
         num_trials += 1
         if (num_trials >
             MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM):
-          logging.warning(
+          _LOGGER.warning(
               'After %d concurrent splitting trials at item #%d, observed '
               'only %s, giving up on this item',
               num_trials,
@@ -604,7 +607,7 @@
           have_failure = True
 
         if have_success and have_failure:
-          logging.info('%d trials to observe both success and failure of '
+          _LOGGER.info('%d trials to observe both success and failure of '
                        'concurrent splitting at item #%d', num_trials, i)
           break
     finally:
@@ -613,11 +616,11 @@
     num_total_trials += num_trials
 
     if num_total_trials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL:
-      logging.warning('After %d total concurrent splitting trials, considered '
+      _LOGGER.warning('After %d total concurrent splitting trials, considered '
                       'only %d items, giving up.', num_total_trials, i)
       break
 
-  logging.info('%d total concurrent splitting trials for %d items',
+  _LOGGER.info('%d total concurrent splitting trials for %d items',
                num_total_trials, len(expected_items))
 
 
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 340449f..3b426cc 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -42,6 +42,9 @@
            'WriteToText']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 class _TextSource(filebasedsource.FileBasedSource):
   r"""A source for reading text files.
 
@@ -127,7 +130,7 @@
       raise ValueError('Cannot skip negative number of header lines: %d'
                        % skip_header_lines)
     elif skip_header_lines > 10:
-      logging.warning(
+      _LOGGER.warning(
           'Skipping %d header lines. Skipping large number of header '
           'lines might significantly slow down processing.')
     self._skip_header_lines = skip_header_lines
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 7b0bd87..ab7d2f5 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -38,6 +38,9 @@
 __all__ = ['ReadFromTFRecord', 'WriteToTFRecord']
 
 
+_LOGGER = logging.getLogger(__name__)
+
+
 def _default_crc32c_fn(value):
   """Calculates crc32c of a bytes object using either snappy or crcmod."""
 
@@ -54,7 +57,7 @@
       pass
 
     if not _default_crc32c_fn.fn:
-      logging.warning('Couldn\'t find python-snappy so the implementation of '
+      _LOGGER.warning('Couldn\'t find python-snappy so the implementation of '
                       '_TFRecordUtil._masked_crc32c is not as fast as it could '
                       'be.')
       _default_crc32c_fn.fn = crcmod.predefined.mkPredefinedCrcFun('crc-32c')
diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py
index 2a13bf8..aed3579 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -51,6 +51,9 @@
 __all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo',
            'MalformedVcfRecord']
 
+
+_LOGGER = logging.getLogger(__name__)
+
 # Stores data about variant INFO fields. The type of 'data' is specified in the
 # VCF headers. 'field_count' is a string that specifies the number of fields
 # that the data type contains. Its value can either be a number representing a
@@ -346,7 +349,7 @@
                                                self._vcf_reader.formats)
       except (LookupError, ValueError):
         if self._allow_malformed_records:
-          logging.warning(
+          _LOGGER.warning(
               'An exception was raised when reading record from VCF file '
               '%s. Invalid record was %s: %s',
               self._file_name, self._last_record, traceback.format_exc())