blob: f8664a0f921ffac2136aeb4fe1263d9984cd1fa8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for BigTable service."""
# pytype: skip-file
import datetime
import string
import unittest
import uuid
from random import choice
from mock import MagicMock
from mock import patch
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.gcp import bigtableio
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment
# Protect against environments where bigtable library is not available.
try:
from google.cloud.bigtable import client, row
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import Table
from google.rpc.code_pb2 import OK, ALREADY_EXISTS
from google.rpc.status_pb2 import Status
except ImportError as e:
client = None
@unittest.skipIf(client is None, 'Bigtable dependencies are not installed')
class TestWriteBigTable(unittest.TestCase):
TABLE_PREFIX = "python-test"
_PROJECT_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
_INSTANCE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
_TABLE_ID = TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
def setUp(self):
client = MagicMock()
instance = Instance(self._INSTANCE_ID, client)
self.table = Table(self._TABLE_ID, instance)
def test_write_metrics(self):
MetricsEnvironment.process_wide_container().reset()
write_fn = bigtableio._BigTableWriteFn(
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
write_fn.table = self.table
write_fn.start_bundle()
number_of_rows = 2
error = Status()
error.message = 'Entity already exists.'
error.code = ALREADY_EXISTS
success = Status()
success.message = 'Success'
success.code = OK
rows_response = [error, success] * number_of_rows
with patch.object(Table, 'mutate_rows', return_value=rows_response):
direct_rows = [self.generate_row(i) for i in range(number_of_rows * 2)]
for direct_row in direct_rows:
write_fn.process(direct_row)
try:
write_fn.finish_bundle()
except: # pylint: disable=bare-except
# Currently we fail the bundle when there are any failures.
# TODO(https://github.com/apache/beam/issues/21396): remove after
# bigtableio can selectively retry.
pass
self.verify_write_call_metric(
self._PROJECT_ID,
self._INSTANCE_ID,
self._TABLE_ID,
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(
ALREADY_EXISTS),
2)
self.verify_write_call_metric(
self._PROJECT_ID,
self._INSTANCE_ID,
self._TABLE_ID,
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(OK),
2)
def generate_row(self, index=0):
rand = choice(string.ascii_letters + string.digits)
value = ''.join(rand for i in range(100))
column_family_id = 'cf1'
key = "beam_key%s" % ('{0:07}'.format(index))
direct_row = row.DirectRow(row_key=key)
for column_id in range(10):
direct_row.set_cell(
column_family_id, ('field%s' % column_id).encode('utf-8'),
value,
datetime.datetime.now())
return direct_row
def verify_write_call_metric(
self, project_id, instance_id, table_id, status, count):
"""Check if a metric was recorded for the Datastore IO write API call."""
process_wide_monitoring_infos = list(
MetricsEnvironment.process_wide_container().
to_runner_api_monitoring_infos(None).values())
resource = resource_identifiers.BigtableTable(
project_id, instance_id, table_id)
labels = {
monitoring_infos.SERVICE_LABEL: 'BigTable',
monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: project_id,
monitoring_infos.INSTANCE_ID_LABEL: instance_id,
monitoring_infos.TABLE_ID_LABEL: table_id,
monitoring_infos.STATUS_LABEL: status
}
expected_mi = monitoring_infos.int64_counter(
monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels)
expected_mi.ClearField("start_time")
found = False
for actual_mi in process_wide_monitoring_infos:
actual_mi.ClearField("start_time")
if expected_mi == actual_mi:
found = True
break
self.assertTrue(
found, "Did not find write call metric with status: %s" % status)
if __name__ == '__main__':
unittest.main()