[BEAM-8661] Moving io module to have per-module logger
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
index 3ff420f..76143dd 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -45,6 +45,9 @@
__all__ = ['FileBasedSink']
+_LOGGER = logging.getLogger(__name__)
+
+
class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
@@ -210,7 +213,7 @@
for file_metadata in mr.metadata_list]
if dst_glob_files:
- logging.warning('Deleting %d existing files in target path matching: %s',
+ _LOGGER.warning('Deleting %d existing files in target path matching: %s',
len(dst_glob_files), self.shard_name_glob_format)
FileSystems.delete(dst_glob_files)
@@ -250,12 +253,12 @@
raise BeamIOError('src and dst files do not exist. src: %s, dst: %s' % (
src, dst))
if not src_exists and dst_exists:
- logging.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
+ _LOGGER.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
num_skipped += 1
continue
if (src_exists and dst_exists and
FileSystems.checksum(src) == FileSystems.checksum(dst)):
- logging.debug('src: %s == dst: %s, deleting src', src, dst)
+ _LOGGER.debug('src: %s == dst: %s, deleting src', src, dst)
delete_files.append(src)
continue
@@ -284,7 +287,7 @@
for i in range(0, len(dst_files), chunk_size)]
if num_shards_to_finalize:
- logging.info(
+ _LOGGER.info(
'Starting finalize_write threads with num_shards: %d (skipped: %d), '
'batches: %d, num_threads: %d',
num_shards_to_finalize, num_skipped, len(source_file_batch),
@@ -304,11 +307,11 @@
raise
for (src, dst), exception in iteritems(exp.exception_details):
if exception:
- logging.error(('Exception in _rename_batch. src: %s, '
+ _LOGGER.error(('Exception in _rename_batch. src: %s, '
'dst: %s, err: %s'), src, dst, exception)
exceptions.append(exception)
else:
- logging.debug('Rename successful: %s -> %s', src, dst)
+ _LOGGER.debug('Rename successful: %s -> %s', src, dst)
return exceptions
exception_batches = util.run_using_threadpool(
@@ -324,10 +327,10 @@
for final_name in dst_files:
yield final_name
- logging.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
+ _LOGGER.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
time.time() - start_time)
else:
- logging.warning(
+ _LOGGER.warning(
'No shards found to finalize. num_shards: %d, skipped: %d',
num_shards, num_skipped)
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index 07d0e8e..1cc3d74 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -44,6 +44,9 @@
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+_LOGGER = logging.getLogger(__name__)
+
+
# TODO: Refactor code so all io tests are using same library
# TestCaseWithTempDirCleanup class.
class _TestCaseWithTempDirCleanUp(unittest.TestCase):
@@ -247,7 +250,7 @@
'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
'/')
except ValueError:
- logging.debug('Ignoring test since GCP module is not installed')
+ _LOGGER.debug('Ignoring test since GCP module is not installed')
@mock.patch('apache_beam.io.localfilesystem.os')
def test_temp_dir_local(self, filesystem_os_mock):
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 1d9fcdd..14c35bc 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -114,6 +114,9 @@
'ReadMatches']
+_LOGGER = logging.getLogger(__name__)
+
+
class EmptyMatchTreatment(object):
"""How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms.
@@ -479,7 +482,7 @@
str,
filesystems.FileSystems.join(temp_location,
'.temp%s' % dir_uid))
- logging.info('Added temporary directory %s', self._temp_directory.get())
+ _LOGGER.info('Added temporary directory %s', self._temp_directory.get())
output = (pcoll
| beam.ParDo(_WriteUnshardedRecordsFn(
@@ -557,7 +560,7 @@
'',
destination)
- logging.info('Moving temporary file %s to dir: %s as %s. Res: %s',
+ _LOGGER.info('Moving temporary file %s to dir: %s as %s. Res: %s',
r.file_name, self.path.get(), final_file_name, r)
final_full_path = filesystems.FileSystems.join(self.path.get(),
@@ -570,7 +573,7 @@
except BeamIOError:
# This error is not serious, because it may happen on a retry of the
# bundle. We simply log it.
- logging.debug('File %s failed to be copied. This may be due to a bundle'
+ _LOGGER.debug('File %s failed to be copied. This may be due to a bundle'
' being retried.', r.file_name)
yield FileResult(final_file_name,
@@ -580,7 +583,7 @@
r.pane,
destination)
- logging.info('Cautiously removing temporary files for'
+ _LOGGER.info('Cautiously removing temporary files for'
' destination %s and window %s', destination, w)
writer_key = (destination, w)
self._remove_temporary_files(writer_key)
@@ -592,10 +595,10 @@
match_result = filesystems.FileSystems.match(['%s*' % prefix])
orphaned_files = [m.path for m in match_result[0].metadata_list]
- logging.debug('Deleting orphaned files: %s', orphaned_files)
+ _LOGGER.debug('Deleting orphaned files: %s', orphaned_files)
filesystems.FileSystems.delete(orphaned_files)
except BeamIOError as e:
- logging.debug('Exceptions when deleting files: %s', e)
+ _LOGGER.debug('Exceptions when deleting files: %s', e)
class _WriteShardedRecordsFn(beam.DoFn):
@@ -625,7 +628,7 @@
sink.flush()
writer.close()
- logging.info('Writing file %s for destination %s and shard %s',
+ _LOGGER.info('Writing file %s for destination %s and shard %s',
full_file_name, destination, repr(shard))
yield FileResult(full_file_name,
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index 72e7f0d..569dd14 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -29,6 +29,9 @@
from apache_beam.io import filesystemio
+_LOGGER = logging.getLogger(__name__)
+
+
class FakeDownloader(filesystemio.Downloader):
def __init__(self, data):
@@ -206,7 +209,7 @@
for buffer_size in buffer_sizes:
for target in [self._read_and_verify, self._read_and_seek]:
- logging.info('buffer_size=%s, target=%s' % (buffer_size, target))
+ _LOGGER.info('buffer_size=%s, target=%s' % (buffer_size, target))
parent_conn, child_conn = multiprocessing.Pipe()
stream = filesystemio.PipeStream(child_conn)
success = [False]
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 21de828..d357946 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -45,6 +45,9 @@
except ImportError:
pass
+
+_LOGGER = logging.getLogger(__name__)
+
WAIT_UNTIL_FINISH_DURATION_MS = 15 * 60 * 1000
BIG_QUERY_DATASET_ID = 'python_query_to_table_'
@@ -90,7 +93,7 @@
try:
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
- logging.debug('Failed to clean up dataset %s' % self.dataset_id)
+ _LOGGER.debug('Failed to clean up dataset %s' % self.dataset_id)
def _setup_new_types_env(self):
table_schema = bigquery.TableSchema()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d3ac5ca..0280c61 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -273,6 +273,9 @@
]
+_LOGGER = logging.getLogger(__name__)
+
+
@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
def _parse_table_reference(table, dataset=None, project=None):
return bigquery_tools.parse_table_reference(table, dataset, project)
@@ -787,7 +790,7 @@
# and avoid the get-or-create step.
return
- logging.debug('Creating or getting table %s with schema %s.',
+ _LOGGER.debug('Creating or getting table %s with schema %s.',
table_reference, schema)
table_schema = self.get_table_schema(schema)
@@ -833,7 +836,7 @@
return self._flush_all_batches()
def _flush_all_batches(self):
- logging.debug('Attempting to flush to all destinations. Total buffered: %s',
+ _LOGGER.debug('Attempting to flush to all destinations. Total buffered: %s',
self._total_buffered_rows)
return itertools.chain(*[self._flush_batch(destination)
@@ -850,7 +853,7 @@
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')
- logging.debug('Flushing data to %s. Total %s rows.',
+ _LOGGER.debug('Flushing data to %s. Total %s rows.',
destination, len(rows_and_insert_ids))
rows = [r[0] for r in rows_and_insert_ids]
@@ -865,7 +868,7 @@
insert_ids=insert_ids,
skip_invalid_rows=True)
- logging.debug("Passed: %s. Errors are %s", passed, errors)
+ _LOGGER.debug("Passed: %s. Errors are %s", passed, errors)
failed_rows = [rows[entry.index] for entry in errors]
should_retry = any(
bigquery_tools.RetryStrategy.should_retry(
@@ -877,7 +880,7 @@
break
else:
retry_backoff = next(self._backoff_calculator)
- logging.info('Sleeping %s seconds before retrying insertion.',
+ _LOGGER.info('Sleeping %s seconds before retrying insertion.',
retry_backoff)
time.sleep(retry_backoff)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index cb285ea..a6043b4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -46,6 +46,9 @@
from apache_beam.transforms import trigger
from apache_beam.transforms.window import GlobalWindows
+
+_LOGGER = logging.getLogger(__name__)
+
ONE_TERABYTE = (1 << 40)
# The maximum file size for imports is 5TB. We keep our files under that.
@@ -320,7 +323,7 @@
copy_to_reference.datasetId,
copy_to_reference.tableId)))
- logging.info("Triggering copy job from %s to %s",
+ _LOGGER.info("Triggering copy job from %s to %s",
copy_from_reference, copy_to_reference)
job_reference = self.bq_wrapper._insert_copy_job(
copy_to_reference.projectId,
@@ -407,7 +410,7 @@
uid = _bq_uuid()
job_name = '%s_%s_%s' % (
load_job_name_prefix, destination_hash, uid)
- logging.debug('Load job has %s files. Job name is %s.',
+ _LOGGER.debug('Load job has %s files. Job name is %s.',
len(files), job_name)
if self.temporary_tables:
@@ -415,7 +418,7 @@
table_reference.tableId = job_name
yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
- logging.info('Triggering job %s to load data to BigQuery table %s.'
+ _LOGGER.info('Triggering job %s to load data to BigQuery table %s.'
'Schema: %s. Additional parameters: %s',
job_name, table_reference,
schema, additional_parameters)
@@ -519,9 +522,9 @@
ref.jobId,
ref.location)
- logging.info("Job status: %s", job.status)
+ _LOGGER.info("Job status: %s", job.status)
if job.status.state == 'DONE' and job.status.errorResult:
- logging.warning("Job %s seems to have failed. Error Result: %s",
+ _LOGGER.warning("Job %s seems to have failed. Error Result: %s",
ref.jobId, job.status.errorResult)
self._latest_error = job.status
return WaitForBQJobs.FAILED
@@ -541,7 +544,7 @@
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
def process(self, table_reference):
- logging.info("Deleting table %s", table_reference)
+ _LOGGER.info("Deleting table %s", table_reference)
table_reference = bigquery_tools.parse_table_reference(table_reference)
self.bq_wrapper._delete_table(
table_reference.projectId,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 035be18..6a61cc2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -58,6 +58,9 @@
HttpError = None
+_LOGGER = logging.getLogger(__name__)
+
+
_DESTINATION_ELEMENT_PAIRS = [
# DESTINATION 1
('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
@@ -609,7 +612,7 @@
self.bigquery_client = bigquery_tools.BigQueryWrapper()
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)
- logging.info("Created dataset %s in project %s",
+ _LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
@attr('IT')
@@ -794,11 +797,11 @@
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
- logging.info("Deleting dataset %s in project %s",
+ _LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
- logging.debug('Failed to clean up dataset %s in project %s',
+ _LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 246d2ce..ff63eda 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -46,6 +46,9 @@
# pylint: enable=wrong-import-order, wrong-import-position
+_LOGGER = logging.getLogger(__name__)
+
+
class BigQueryReadIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_read_table_'
@@ -59,7 +62,7 @@
str(int(time.time())),
random.randint(0, 10000))
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
- logging.info("Created dataset %s in project %s",
+ _LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
def tearDown(self):
@@ -67,11 +70,11 @@
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
- logging.info("Deleting dataset %s in project %s",
+ _LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
- logging.debug('Failed to clean up dataset %s in project %s',
+ _LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
def create_table(self, tablename):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b8c8c1c..6cf4529 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -71,6 +71,9 @@
# pylint: enable=wrong-import-order, wrong-import-position
+_LOGGER = logging.getLogger(__name__)
+
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestTableRowJsonCoder(unittest.TestCase):
@@ -579,7 +582,7 @@
self.bigquery_client = bigquery_tools.BigQueryWrapper()
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)
- logging.info("Created dataset %s in project %s",
+ _LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
@attr('IT')
@@ -741,11 +744,11 @@
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
- logging.info("Deleting dataset %s in project %s",
+ _LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
- logging.debug('Failed to clean up dataset %s in project %s',
+ _LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 0649703..8510294 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -62,6 +62,9 @@
# pylint: enable=wrong-import-order, wrong-import-position
+_LOGGER = logging.getLogger(__name__)
+
+
MAX_RETRIES = 3
@@ -262,7 +265,7 @@
if response.statistics is None:
# This behavior is only expected in tests
- logging.warning(
+ _LOGGER.warning(
"Unable to get location, missing response.statistics. Query: %s",
query)
return None
@@ -274,11 +277,11 @@
table.projectId,
table.datasetId,
table.tableId)
- logging.info("Using location %r from table %r referenced by query %s",
+ _LOGGER.info("Using location %r from table %r referenced by query %s",
location, table, query)
return location
- logging.debug("Query %s does not reference any tables.", query)
+ _LOGGER.debug("Query %s does not reference any tables.", query)
return None
@retry.with_exponential_backoff(
@@ -309,9 +312,9 @@
)
)
- logging.info("Inserting job request: %s", request)
+ _LOGGER.info("Inserting job request: %s", request)
response = self.client.jobs.Insert(request)
- logging.info("Response was %s", response)
+ _LOGGER.info("Response was %s", response)
return response.jobReference
@retry.with_exponential_backoff(
@@ -442,7 +445,7 @@
request = bigquery.BigqueryTablesInsertRequest(
projectId=project_id, datasetId=dataset_id, table=table)
response = self.client.tables.Insert(request)
- logging.debug("Created the table with id %s", table_id)
+ _LOGGER.debug("Created the table with id %s", table_id)
# The response is a bigquery.Table instance.
return response
@@ -491,7 +494,7 @@
self.client.tables.Delete(request)
except HttpError as exn:
if exn.status_code == 404:
- logging.warning('Table %s:%s.%s does not exist', project_id,
+ _LOGGER.warning('Table %s:%s.%s does not exist', project_id,
dataset_id, table_id)
return
else:
@@ -508,7 +511,7 @@
self.client.datasets.Delete(request)
except HttpError as exn:
if exn.status_code == 404:
- logging.warning('Dataset %s:%s does not exist', project_id,
+ _LOGGER.warning('Dataset %s:%s does not exist', project_id,
dataset_id)
return
else:
@@ -537,7 +540,7 @@
% (project_id, dataset_id))
except HttpError as exn:
if exn.status_code == 404:
- logging.warning(
+ _LOGGER.warning(
'Dataset %s:%s does not exist so we will create it as temporary '
'with location=%s',
project_id, dataset_id, location)
@@ -555,7 +558,7 @@
projectId=project_id, datasetId=temp_table.datasetId))
except HttpError as exn:
if exn.status_code == 404:
- logging.warning('Dataset %s:%s does not exist', project_id,
+ _LOGGER.warning('Dataset %s:%s does not exist', project_id,
temp_table.datasetId)
return
else:
@@ -669,12 +672,12 @@
additional_parameters=additional_create_parameters)
except HttpError as exn:
if exn.status_code == 409:
- logging.debug('Skipping Creation. Table %s:%s.%s already exists.'
+ _LOGGER.debug('Skipping Creation. Table %s:%s.%s already exists.'
% (project_id, dataset_id, table_id))
created_table = self.get_table(project_id, dataset_id, table_id)
else:
raise
- logging.info('Created table %s.%s.%s with schema %s. '
+ _LOGGER.info('Created table %s.%s.%s with schema %s. '
'Result: %s.',
project_id, dataset_id, table_id,
schema or found_table.schema,
@@ -684,7 +687,7 @@
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
# BigQuery can route data to the old table for 2 mins max so wait
# that much time before creating the table and writing it
- logging.warning('Sleeping for 150 seconds before the write as ' +
+ _LOGGER.warning('Sleeping for 150 seconds before the write as ' +
'BigQuery inserts can be routed to deleted table ' +
'for 2 mins after the delete and create.')
# TODO(BEAM-2673): Remove this sleep by migrating to load api
@@ -713,7 +716,7 @@
# request not for the actual execution of the query in the service. If
# the request times out we keep trying. This situation is quite possible
# if the query will return a large number of rows.
- logging.info('Waiting on response from query: %s ...', query)
+ _LOGGER.info('Waiting on response from query: %s ...', query)
time.sleep(1.0)
continue
# We got some results. The last page is signalled by a missing pageToken.
@@ -975,7 +978,7 @@
def _flush_rows_buffer(self):
if self.rows_buffer:
- logging.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
+ _LOGGER.info('Writing %d rows to %s:%s.%s table.', len(self.rows_buffer),
self.project_id, self.dataset_id, self.table_id)
passed, errors = self.client.insert_rows(
project_id=self.project_id, dataset_id=self.dataset_id,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 3658b9c..ae56e35 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -48,6 +48,9 @@
# pylint: enable=wrong-import-order, wrong-import-position
+_LOGGER = logging.getLogger(__name__)
+
+
class BigQueryWriteIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_write_to_table_'
@@ -61,7 +64,7 @@
str(int(time.time())),
random.randint(0, 10000))
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
- logging.info("Created dataset %s in project %s",
+ _LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
def tearDown(self):
@@ -69,11 +72,11 @@
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
- logging.info("Deleting dataset %s in project %s",
+ _LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
- logging.debug('Failed to clean up dataset %s in project %s',
+ _LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
def create_table(self, table_name):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 3d32611..a6fc816 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -45,12 +45,16 @@
from apache_beam.transforms import PTransform
from apache_beam.transforms.util import Values
+
+_LOGGER = logging.getLogger(__name__)
+
+
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud.proto.datastore.v1 import datastore_pb2
from googledatastore import helper as datastore_helper
- logging.warning(
+ _LOGGER.warning(
'Using deprecated Datastore client.\n'
'This client will be removed in Beam 3.0 (next Beam major release).\n'
'Please migrate to apache_beam.io.gcp.datastore.v1new.datastoreio.')
@@ -125,7 +129,7 @@
'Google Cloud IO not available, '
'please install apache_beam[gcp]')
- logging.warning('datastoreio read transform is experimental.')
+ _LOGGER.warning('datastoreio read transform is experimental.')
super(ReadFromDatastore, self).__init__()
if not project:
@@ -213,13 +217,13 @@
else:
estimated_num_splits = self._num_splits
- logging.info("Splitting the query into %d splits", estimated_num_splits)
+ _LOGGER.info("Splitting the query into %d splits", estimated_num_splits)
try:
query_splits = query_splitter.get_splits(
self._datastore, query, estimated_num_splits,
helper.make_partition(self._project, self._datastore_namespace))
except Exception:
- logging.warning("Unable to parallelize the given query: %s", query,
+ _LOGGER.warning("Unable to parallelize the given query: %s", query,
exc_info=True)
query_splits = [query]
@@ -296,7 +300,7 @@
kind = query.kind[0].name
latest_timestamp = ReadFromDatastore.query_latest_statistics_timestamp(
project, namespace, datastore)
- logging.info('Latest stats timestamp for kind %s is %s',
+ _LOGGER.info('Latest stats timestamp for kind %s is %s',
kind, latest_timestamp)
kind_stats_query = (
@@ -316,13 +320,13 @@
try:
estimated_size_bytes = ReadFromDatastore.get_estimated_size_bytes(
project, namespace, query, datastore)
- logging.info('Estimated size bytes for query: %s', estimated_size_bytes)
+ _LOGGER.info('Estimated size bytes for query: %s', estimated_size_bytes)
num_splits = int(min(ReadFromDatastore._NUM_QUERY_SPLITS_MAX, round(
(float(estimated_size_bytes) /
ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
except Exception as e:
- logging.warning('Failed to fetch estimated size bytes: %s', e)
+ _LOGGER.warning('Failed to fetch estimated size bytes: %s', e)
# Fallback in case estimated size is unavailable.
num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
@@ -346,7 +350,7 @@
"""
self._project = project
self._mutation_fn = mutation_fn
- logging.warning('datastoreio write transform is experimental.')
+ _LOGGER.warning('datastoreio write transform is experimental.')
def expand(self, pcoll):
return (pcoll
@@ -424,7 +428,7 @@
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS//1000)
- logging.debug("Successfully wrote %d mutations in %dms.",
+ _LOGGER.debug("Successfully wrote %d mutations in %dms.",
len(self._mutations), latency_ms)
if not self._fixed_batch_size:
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index a2bc521..4ea2898 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -54,6 +54,9 @@
# pylint: enable=ungrouped-imports
+_LOGGER = logging.getLogger(__name__)
+
+
def key_comparator(k1, k2):
"""A comparator for Datastore keys.
@@ -216,7 +219,7 @@
def commit(request):
# Client-side throttling.
while throttler.throttle_request(time.time()*1000):
- logging.info("Delaying request for %ds due to previous failures",
+ _LOGGER.info("Delaying request for %ds due to previous failures",
throttle_delay)
time.sleep(throttle_delay)
rpc_stats_callback(throttled_secs=throttle_delay)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
index f5b1157..21d7781 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_pipeline.py
@@ -48,6 +48,9 @@
from apache_beam.testing.util import equal_to
+_LOGGER = logging.getLogger(__name__)
+
+
def new_pipeline_with_job_name(pipeline_options, job_name, suffix):
"""Create a pipeline with the given job_name and a suffix."""
gcp_options = pipeline_options.view_as(GoogleCloudOptions)
@@ -108,7 +111,7 @@
# Pipeline 1: Create and write the specified number of Entities to the
# Cloud Datastore.
ancestor_key = Key([kind, str(uuid.uuid4())], project=project)
- logging.info('Writing %s entities to %s', num_entities, project)
+ _LOGGER.info('Writing %s entities to %s', num_entities, project)
p = new_pipeline_with_job_name(pipeline_options, job_name, '-write')
_ = (p
| 'Input' >> beam.Create(list(range(num_entities)))
@@ -121,7 +124,7 @@
# Optional Pipeline 2: If a read limit was provided, read it and confirm
# that the expected entities were read.
if known_args.limit is not None:
- logging.info('Querying a limited set of %s entities and verifying count.',
+ _LOGGER.info('Querying a limited set of %s entities and verifying count.',
known_args.limit)
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-limit')
query.limit = known_args.limit
@@ -134,7 +137,7 @@
query.limit = None
# Pipeline 3: Query the written Entities and verify result.
- logging.info('Querying entities, asserting they match.')
+ _LOGGER.info('Querying entities, asserting they match.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify')
entities = p | 'read from datastore' >> ReadFromDatastore(query)
@@ -145,7 +148,7 @@
p.run()
# Pipeline 4: Delete Entities.
- logging.info('Deleting entities.')
+ _LOGGER.info('Deleting entities.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-delete')
entities = p | 'read from datastore' >> ReadFromDatastore(query)
_ = (entities
@@ -155,7 +158,7 @@
p.run()
# Pipeline 5: Query the written Entities, verify no results.
- logging.info('Querying for the entities to make sure there are none present.')
+ _LOGGER.info('Querying for the entities to make sure there are none present.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-deleted')
entities = p | 'read from datastore' >> ReadFromDatastore(query)
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
index f71a801..a70ea95 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
@@ -51,6 +51,9 @@
__all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
+_LOGGER = logging.getLogger(__name__)
+
+
@typehints.with_output_types(types.Entity)
class ReadFromDatastore(PTransform):
"""A ``PTransform`` for querying Google Cloud Datastore.
@@ -173,11 +176,11 @@
else:
estimated_num_splits = self._num_splits
- logging.info("Splitting the query into %d splits", estimated_num_splits)
+ _LOGGER.info("Splitting the query into %d splits", estimated_num_splits)
query_splits = query_splitter.get_splits(
client, query, estimated_num_splits)
except query_splitter.QuerySplitterError:
- logging.info("Unable to parallelize the given query: %s", query,
+ _LOGGER.info("Unable to parallelize the given query: %s", query,
exc_info=True)
query_splits = [query]
@@ -219,7 +222,7 @@
latest_timestamp = (
ReadFromDatastore._SplitQueryFn
.query_latest_statistics_timestamp(client))
- logging.info('Latest stats timestamp for kind %s is %s',
+ _LOGGER.info('Latest stats timestamp for kind %s is %s',
kind_name, latest_timestamp)
if client.namespace is None:
@@ -243,12 +246,12 @@
estimated_size_bytes = (
ReadFromDatastore._SplitQueryFn
.get_estimated_size_bytes(client, query))
- logging.info('Estimated size bytes for query: %s', estimated_size_bytes)
+ _LOGGER.info('Estimated size bytes for query: %s', estimated_size_bytes)
num_splits = int(min(ReadFromDatastore._NUM_QUERY_SPLITS_MAX, round(
(float(estimated_size_bytes) /
ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
except Exception as e:
- logging.warning('Failed to fetch estimated size bytes: %s', e)
+ _LOGGER.warning('Failed to fetch estimated size bytes: %s', e)
# Fallback in case estimated size is unavailable.
num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
@@ -360,7 +363,7 @@
"""
# Client-side throttling.
while throttler.throttle_request(time.time() * 1000):
- logging.info("Delaying request for %ds due to previous failures",
+ _LOGGER.info("Delaying request for %ds due to previous failures",
throttle_delay)
time.sleep(throttle_delay)
rpc_stats_callback(throttled_secs=throttle_delay)
@@ -412,7 +415,7 @@
self._throttler,
rpc_stats_callback=self._update_rpc_stats,
throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS // 1000)
- logging.debug("Successfully wrote %d mutations in %dms.",
+ _LOGGER.debug("Successfully wrote %d mutations in %dms.",
len(self._batch.mutations), latency_ms)
now = time.time() * 1000
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
index 21633d9..c3bf8ef 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/types_test.py
@@ -40,6 +40,9 @@
client = None
+_LOGGER = logging.getLogger(__name__)
+
+
@unittest.skipIf(client is None, 'Datastore dependencies are not installed')
class TypesTest(unittest.TestCase):
_PROJECT = 'project'
@@ -168,7 +171,7 @@
self.assertEqual(order, cq.order)
self.assertEqual(distinct_on, cq.distinct_on)
- logging.info('query: %s', q) # Test __repr__()
+ _LOGGER.info('query: %s', q) # Test __repr__()
def testValueProviderFilters(self):
self.vp_filters = [
@@ -193,7 +196,7 @@
cq = q._to_client_query(self._test_client)
self.assertEqual(exp_filter, cq.filters)
- logging.info('query: %s', q) # Test __repr__()
+ _LOGGER.info('query: %s', q) # Test __repr__()
def testQueryEmptyNamespace(self):
# Test that we can pass a namespace of None.
diff --git a/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py b/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
index 67e375f..2d0be8f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/datastore_write_it_pipeline.py
@@ -57,6 +57,9 @@
# pylint: enable=ungrouped-imports
+_LOGGER = logging.getLogger(__name__)
+
+
def new_pipeline_with_job_name(pipeline_options, job_name, suffix):
"""Create a pipeline with the given job_name and a suffix."""
gcp_options = pipeline_options.view_as(GoogleCloudOptions)
@@ -137,7 +140,7 @@
# Pipeline 1: Create and write the specified number of Entities to the
# Cloud Datastore.
- logging.info('Writing %s entities to %s', num_entities, project)
+ _LOGGER.info('Writing %s entities to %s', num_entities, project)
p = new_pipeline_with_job_name(pipeline_options, job_name, '-write')
# pylint: disable=expression-not-assigned
@@ -152,7 +155,7 @@
# Optional Pipeline 2: If a read limit was provided, read it and confirm
# that the expected entities were read.
if known_args.limit is not None:
- logging.info('Querying a limited set of %s entities and verifying count.',
+ _LOGGER.info('Querying a limited set of %s entities and verifying count.',
known_args.limit)
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-limit')
query_with_limit = query_pb2.Query()
@@ -167,7 +170,7 @@
p.run()
# Pipeline 3: Query the written Entities and verify result.
- logging.info('Querying entities, asserting they match.')
+ _LOGGER.info('Querying entities, asserting they match.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify')
entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
@@ -178,7 +181,7 @@
p.run()
# Pipeline 4: Delete Entities.
- logging.info('Deleting entities.')
+ _LOGGER.info('Deleting entities.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-delete')
entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
# pylint: disable=expression-not-assigned
@@ -189,7 +192,7 @@
p.run()
# Pipeline 5: Query the written Entities, verify no results.
- logging.info('Querying for the entities to make sure there are none present.')
+ _LOGGER.info('Querying for the entities to make sure there are none present.')
p = new_pipeline_with_job_name(pipeline_options, job_name, '-verify-deleted')
entities = p | 'read from datastore' >> ReadFromDatastore(project, query)
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index dfdc29d..c1e0314 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -44,6 +44,9 @@
__all__ = ['GcsIO']
+_LOGGER = logging.getLogger(__name__)
+
+
# Issue a friendlier error message if the storage library is not available.
# TODO(silviuc): Remove this guard when storage is available everywhere.
try:
@@ -250,7 +253,7 @@
maxBytesRewrittenPerCall=max_bytes_rewritten_per_call)
response = self.client.objects.Rewrite(request)
while not response.done:
- logging.debug(
+ _LOGGER.debug(
'Rewrite progress: %d of %d bytes, %s to %s',
response.totalBytesRewritten, response.objectSize, src, dest)
request.rewriteToken = response.rewriteToken
@@ -258,7 +261,7 @@
if self._rewrite_cb is not None:
self._rewrite_cb(response)
- logging.debug('Rewrite done: %s to %s', src, dest)
+ _LOGGER.debug('Rewrite done: %s to %s', src, dest)
# We intentionally do not decorate this method with a retry, as retrying is
# handled in BatchApiRequest.Execute().
@@ -320,12 +323,12 @@
GcsIOError(errno.ENOENT, 'Source file not found: %s' % src))
pair_to_status[pair] = exception
elif not response.done:
- logging.debug(
+ _LOGGER.debug(
'Rewrite progress: %d of %d bytes, %s to %s',
response.totalBytesRewritten, response.objectSize, src, dest)
pair_to_request[pair].rewriteToken = response.rewriteToken
else:
- logging.debug('Rewrite done: %s to %s', src, dest)
+ _LOGGER.debug('Rewrite done: %s to %s', src, dest)
pair_to_status[pair] = None
return [(pair[0], pair[1], pair_to_status[pair]) for pair in src_dest_pairs]
@@ -458,7 +461,7 @@
file_sizes = {}
counter = 0
start_time = time.time()
- logging.info("Starting the size estimation of the input")
+ _LOGGER.info("Starting the size estimation of the input")
while True:
response = self.client.objects.List(request)
for item in response.items:
@@ -466,12 +469,12 @@
file_sizes[file_name] = item.size
counter += 1
if counter % 10000 == 0:
- logging.info("Finished computing size of: %s files", len(file_sizes))
+ _LOGGER.info("Finished computing size of: %s files", len(file_sizes))
if response.nextPageToken:
request.pageToken = response.nextPageToken
else:
break
- logging.info("Finished listing %s files in %s seconds.",
+ _LOGGER.info("Finished listing %s files in %s seconds.",
counter, time.time() - start_time)
return file_sizes
@@ -492,7 +495,7 @@
if http_error.status_code == 404:
raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
else:
- logging.error('HTTP error while requesting file %s: %s', self._path,
+ _LOGGER.error('HTTP error while requesting file %s: %s', self._path,
http_error)
raise
self._size = metadata.size
@@ -564,7 +567,7 @@
try:
self._client.objects.Insert(self._insert_request, upload=self._upload)
except Exception as e: # pylint: disable=broad-except
- logging.error('Error in _start_upload while inserting file %s: %s',
+ _LOGGER.error('Error in _start_upload while inserting file %s: %s',
self._path, traceback.format_exc())
self._upload_thread.last_error = e
finally:
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
index a5fc749..9664348 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_overrides.py
@@ -27,6 +27,9 @@
from apitools.base.py import util
+_LOGGER = logging.getLogger(__name__)
+
+
class GcsIOOverrides(object):
"""Functions for overriding Google Cloud Storage I/O client."""
@@ -37,13 +40,13 @@
# handling GCS download throttling errors (BEAM-7424)
if (isinstance(retry_args.exc, exceptions.BadStatusCodeError) and
retry_args.exc.status_code == http_wrapper.TOO_MANY_REQUESTS):
- logging.debug(
+ _LOGGER.debug(
'Caught GCS quota error (%s), retrying.', retry_args.exc.status_code)
else:
return http_wrapper.HandleExceptionsAndRebuildHttpConnections(retry_args)
http_wrapper.RebuildHttpConnections(retry_args.http)
- logging.debug('Retrying request to url %s after exception %s',
+ _LOGGER.debug('Retrying request to url %s after exception %s',
retry_args.http_request.url, retry_args.exc)
sleep_seconds = util.CalculateWaitForRetry(
retry_args.num_retries, max_wait=retry_args.max_retry_wait)
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index c3394a1..3ec3999 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -46,6 +46,9 @@
MAX_RETRIES = 5
+_LOGGER = logging.getLogger(__name__)
+
+
def retry_on_http_and_value_error(exception):
"""Filter allowing retries on Bigquery errors and value error."""
return isinstance(exception, (GoogleCloudError, ValueError))
@@ -83,10 +86,10 @@
def _matches(self, _):
if self.checksum is None:
response = self._query_with_retry()
- logging.info('Read from given query (%s), total rows %d',
+ _LOGGER.info('Read from given query (%s), total rows %d',
self.query, len(response))
self.checksum = compute_hash(response)
- logging.info('Generate checksum: %s', self.checksum)
+ _LOGGER.info('Generate checksum: %s', self.checksum)
return self.checksum == self.expected_checksum
@@ -95,7 +98,7 @@
retry_filter=retry_on_http_and_value_error)
def _query_with_retry(self):
"""Run Bigquery query with retry if got error http response"""
- logging.info('Attempting to perform query %s to BQ', self.query)
+ _LOGGER.info('Attempting to perform query %s to BQ', self.query)
# Create client here since it throws an exception if pickled.
bigquery_client = bigquery.Client(self.project)
query_job = bigquery_client.query(self.query)
@@ -134,7 +137,7 @@
def _matches(self, _):
if self.actual_data is None:
self.actual_data = self._get_query_result()
- logging.info('Result of query is: %r', self.actual_data)
+ _LOGGER.info('Result of query is: %r', self.actual_data)
try:
equal_to(self.expected_data)(self.actual_data)
@@ -179,7 +182,7 @@
response = self._query_with_retry()
if len(response) >= len(self.expected_data):
return response
- logging.debug('Query result contains %d rows' % len(response))
+ _LOGGER.debug('Query result contains %d rows' % len(response))
time.sleep(1)
if sys.version_info >= (3,):
raise TimeoutError('Timeout exceeded for matcher.') # noqa: F821
@@ -207,13 +210,13 @@
return bigquery_wrapper.get_table(self.project, self.dataset, self.table)
def _matches(self, _):
- logging.info('Start verify Bigquery table properties.')
+ _LOGGER.info('Start verify Bigquery table properties.')
# Run query
bigquery_wrapper = bigquery_tools.BigQueryWrapper()
self.actual_table = self._get_table_with_retry(bigquery_wrapper)
- logging.info('Table proto is %s', self.actual_table)
+ _LOGGER.info('Table proto is %s', self.actual_table)
return all(
self._match_property(v, self._get_or_none(self.actual_table, k))
@@ -231,7 +234,7 @@
@staticmethod
def _match_property(expected, actual):
- logging.info("Matching %s to %s", expected, actual)
+ _LOGGER.info("Matching %s to %s", expected, actual)
if isinstance(expected, dict):
return all(
BigQueryTableMatcher._match_property(
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 7a0b5c8..aaa86f6 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -40,6 +40,9 @@
MAX_MESSAGES_IN_ONE_PULL = 50
+_LOGGER = logging.getLogger(__name__)
+
+
class PubSubMessageMatcher(BaseMatcher):
"""Matcher that verifies messages from given subscription.
@@ -123,7 +126,7 @@
time.sleep(1)
if time.time() - start_time > timeout:
- logging.error('Timeout after %d sec. Received %d messages from %s.',
+ _LOGGER.error('Timeout after %d sec. Received %d messages from %s.',
timeout, len(total_messages), self.sub_name)
return total_messages
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py
index 4ed9af3..dbf8ac9 100644
--- a/sdks/python/apache_beam/io/gcp/tests/utils.py
+++ b/sdks/python/apache_beam/io/gcp/tests/utils.py
@@ -37,6 +37,9 @@
bigquery = None
+_LOGGER = logging.getLogger(__name__)
+
+
class GcpTestIOError(retry.PermanentException):
"""Basic GCP IO error for testing. Function that raises this error should
not be retried."""
@@ -93,7 +96,7 @@
dataset_id: Name of the dataset where table is.
table_id: Name of the table.
"""
- logging.info('Clean up a BigQuery table with project: %s, dataset: %s, '
+ _LOGGER.info('Clean up a BigQuery table with project: %s, dataset: %s, '
'table: %s.', project, dataset_id, table_id)
client = bigquery.Client(project=project)
table_ref = client.dataset(dataset_id).table(table_id)
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 71d74e8..1d7d5cc 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -56,6 +56,9 @@
_FILE_STATUS_TYPE_FILE = 'FILE'
+_LOGGER = logging.getLogger(__name__)
+
+
class HdfsDownloader(filesystemio.Downloader):
def __init__(self, hdfs_client, path):
@@ -196,7 +199,7 @@
@staticmethod
def _add_compression(stream, path, mime_type, compression_type):
if mime_type != 'application/octet-stream':
- logging.warning('Mime types are not supported. Got non-default mime_type:'
+ _LOGGER.warning('Mime types are not supported. Got non-default mime_type:'
' %s', mime_type)
if compression_type == CompressionTypes.AUTO:
compression_type = CompressionTypes.detect_compression_type(path)
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e21052f..dd97b7f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -61,6 +61,9 @@
'Sink', 'Write', 'Writer']
+_LOGGER = logging.getLogger(__name__)
+
+
# Encapsulates information about a bundle of a source generated when method
# BoundedSource.split() is invoked.
# This is a named 4-tuple that has following fields.
@@ -1075,7 +1078,7 @@
write_results = list(write_results)
extra_shards = []
if len(write_results) < min_shards:
- logging.debug(
+ _LOGGER.debug(
'Creating %s empty shard(s).', min_shards - len(write_results))
for _ in range(min_shards - len(write_results)):
writer = sink.open_writer(init_result, str(uuid.uuid4()))
diff --git a/sdks/python/apache_beam/io/mongodbio.py b/sdks/python/apache_beam/io/mongodbio.py
index 6004ca1..5abab237 100644
--- a/sdks/python/apache_beam/io/mongodbio.py
+++ b/sdks/python/apache_beam/io/mongodbio.py
@@ -66,6 +66,10 @@
from apache_beam.transforms import Reshuffle
from apache_beam.utils.annotations import experimental
+
+_LOGGER = logging.getLogger(__name__)
+
+
try:
# Mongodb has its own bundled bson, which is not compatible with bson pakcage.
# (https://github.com/py-bson/bson/issues/82). Try to import objectid and if
@@ -80,7 +84,7 @@
from pymongo import ReplaceOne
except ImportError:
objectid = None
- logging.warning("Could not find a compatible bson package.")
+ _LOGGER.warning("Could not find a compatible bson package.")
__all__ = ['ReadFromMongoDB', 'WriteToMongoDB']
@@ -497,7 +501,7 @@
replacement=doc,
upsert=True))
resp = self.client[self.db][self.coll].bulk_write(requests)
- logging.debug('BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, '
+ _LOGGER.debug('BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, '
'nMatched:%d, Errors:%s' %
(resp.modified_count, resp.upserted_count, resp.matched_count,
resp.bulk_api_result.get('writeErrors')))
diff --git a/sdks/python/apache_beam/io/mongodbio_it_test.py b/sdks/python/apache_beam/io/mongodbio_it_test.py
index bfc6099..7189ac6 100644
--- a/sdks/python/apache_beam/io/mongodbio_it_test.py
+++ b/sdks/python/apache_beam/io/mongodbio_it_test.py
@@ -28,6 +28,9 @@
from apache_beam.testing.util import equal_to
+_LOGGER = logging.getLogger(__name__)
+
+
def run(argv=None):
default_db = 'beam_mongodbio_it_db'
default_coll = 'integration_test_%d' % time.time()
@@ -54,7 +57,7 @@
# Test Write to MongoDB
with TestPipeline(options=PipelineOptions(pipeline_args)) as p:
start_time = time.time()
- logging.info('Writing %d documents to mongodb' % known_args.num_documents)
+ _LOGGER.info('Writing %d documents to mongodb' % known_args.num_documents)
docs = [{
'number': x,
'number_mod_2': x % 2,
@@ -67,13 +70,13 @@
known_args.mongo_coll,
known_args.batch_size)
elapsed = time.time() - start_time
- logging.info('Writing %d documents to mongodb finished in %.3f seconds' %
+ _LOGGER.info('Writing %d documents to mongodb finished in %.3f seconds' %
(known_args.num_documents, elapsed))
# Test Read from MongoDB
with TestPipeline(options=PipelineOptions(pipeline_args)) as p:
start_time = time.time()
- logging.info('Reading from mongodb %s:%s' %
+ _LOGGER.info('Reading from mongodb %s:%s' %
(known_args.mongo_db, known_args.mongo_coll))
r = p | 'ReadFromMongoDB' >> \
beam.io.ReadFromMongoDB(known_args.mongo_uri,
@@ -85,7 +88,7 @@
r, equal_to([number for number in range(known_args.num_documents)]))
elapsed = time.time() - start_time
- logging.info('Read %d documents from mongodb finished in %.3f seconds' %
+ _LOGGER.info('Read %d documents from mongodb finished in %.3f seconds' %
(known_args.num_documents, elapsed))
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index c46f801..d4845fb 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -34,6 +34,9 @@
'OrderedPositionRangeTracker', 'UnsplittableRangeTracker']
+_LOGGER = logging.getLogger(__name__)
+
+
class OffsetRangeTracker(iobase.RangeTracker):
"""A 'RangeTracker' for non-negative positions of type 'long'."""
@@ -137,27 +140,27 @@
assert isinstance(split_offset, (int, long))
with self._lock:
if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
- logging.debug('refusing to split %r at %d: stop position unspecified',
+ _LOGGER.debug('refusing to split %r at %d: stop position unspecified',
self, split_offset)
return
if self._last_record_start == -1:
- logging.debug('Refusing to split %r at %d: unstarted', self,
+ _LOGGER.debug('Refusing to split %r at %d: unstarted', self,
split_offset)
return
if split_offset <= self._last_record_start:
- logging.debug(
+ _LOGGER.debug(
'Refusing to split %r at %d: already past proposed stop offset',
self, split_offset)
return
if (split_offset < self.start_position()
or split_offset >= self.stop_position()):
- logging.debug(
+ _LOGGER.debug(
'Refusing to split %r at %d: proposed split position out of range',
self, split_offset)
return
- logging.debug('Agreeing to split %r at %d', self, split_offset)
+ _LOGGER.debug('Agreeing to split %r at %d', self, split_offset)
split_fraction = (float(split_offset - self._start_offset) / (
self._stop_offset - self._start_offset))
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index d83a62a..7291786 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -68,6 +68,9 @@
'assert_split_at_fraction_succeeds_and_consistent']
+_LOGGER = logging.getLogger(__name__)
+
+
class ExpectedSplitOutcome(object):
MUST_SUCCEED_AND_BE_CONSISTENT = 1
MUST_FAIL = 2
@@ -588,7 +591,7 @@
num_trials += 1
if (num_trials >
MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM):
- logging.warning(
+ _LOGGER.warning(
'After %d concurrent splitting trials at item #%d, observed '
'only %s, giving up on this item',
num_trials,
@@ -604,7 +607,7 @@
have_failure = True
if have_success and have_failure:
- logging.info('%d trials to observe both success and failure of '
+ _LOGGER.info('%d trials to observe both success and failure of '
'concurrent splitting at item #%d', num_trials, i)
break
finally:
@@ -613,11 +616,11 @@
num_total_trials += num_trials
if num_total_trials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL:
- logging.warning('After %d total concurrent splitting trials, considered '
+ _LOGGER.warning('After %d total concurrent splitting trials, considered '
'only %d items, giving up.', num_total_trials, i)
break
- logging.info('%d total concurrent splitting trials for %d items',
+ _LOGGER.info('%d total concurrent splitting trials for %d items',
num_total_trials, len(expected_items))
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 340449f..3b426cc 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -42,6 +42,9 @@
'WriteToText']
+_LOGGER = logging.getLogger(__name__)
+
+
class _TextSource(filebasedsource.FileBasedSource):
r"""A source for reading text files.
@@ -127,7 +130,7 @@
raise ValueError('Cannot skip negative number of header lines: %d'
% skip_header_lines)
elif skip_header_lines > 10:
- logging.warning(
+ _LOGGER.warning(
'Skipping %d header lines. Skipping large number of header '
'lines might significantly slow down processing.')
self._skip_header_lines = skip_header_lines
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 7b0bd87..ab7d2f5 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -38,6 +38,9 @@
__all__ = ['ReadFromTFRecord', 'WriteToTFRecord']
+_LOGGER = logging.getLogger(__name__)
+
+
def _default_crc32c_fn(value):
"""Calculates crc32c of a bytes object using either snappy or crcmod."""
@@ -54,7 +57,7 @@
pass
if not _default_crc32c_fn.fn:
- logging.warning('Couldn\'t find python-snappy so the implementation of '
+ _LOGGER.warning('Couldn\'t find python-snappy so the implementation of '
'_TFRecordUtil._masked_crc32c is not as fast as it could '
'be.')
_default_crc32c_fn.fn = crcmod.predefined.mkPredefinedCrcFun('crc-32c')
diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py
index 2a13bf8..aed3579 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -51,6 +51,9 @@
__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo',
'MalformedVcfRecord']
+
+_LOGGER = logging.getLogger(__name__)
+
# Stores data about variant INFO fields. The type of 'data' is specified in the
# VCF headers. 'field_count' is a string that specifies the number of fields
# that the data type contains. Its value can either be a number representing a
@@ -346,7 +349,7 @@
self._vcf_reader.formats)
except (LookupError, ValueError):
if self._allow_malformed_records:
- logging.warning(
+ _LOGGER.warning(
'An exception was raised when reading record from VCF file '
'%s. Invalid record was %s: %s',
self._file_name, self._last_record, traceback.format_exc())