blob: a6fc8166a2be5b936efd6641e9b9347dd0e74317 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A connector for reading from and writing to Google Cloud Datastore.
Please avoid using this module for new pipelines since
``apache_beam.io.gcp.datastore.v1new.datastoreio`` will replace it in the
next Beam major release.
"""
from __future__ import absolute_import
from __future__ import division
import logging
import sys
import time
import warnings
from builtins import round
from apache_beam.io.gcp.datastore.v1 import helper
from apache_beam.io.gcp.datastore.v1 import query_splitter
from apache_beam.io.gcp.datastore.v1 import util
from apache_beam.io.gcp.datastore.v1.adaptive_throttler import AdaptiveThrottler
from apache_beam.metrics.metric import Metrics
from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
from apache_beam.transforms import FlatMap
from apache_beam.transforms import GroupByKey
from apache_beam.transforms import Map
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.util import Values
_LOGGER = logging.getLogger(__name__)
# Protect against environments where datastore library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud.proto.datastore.v1 import datastore_pb2
from googledatastore import helper as datastore_helper
_LOGGER.warning(
'Using deprecated Datastore client.\n'
'This client will be removed in Beam 3.0 (next Beam major release).\n'
'Please migrate to apache_beam.io.gcp.datastore.v1new.datastoreio.')
except ImportError:
if sys.version_info[0] == 3:
warnings.warn('Datastore IO will support Python 3 after replacing '
'googledatastore by google-cloud-datastore, '
'see: BEAM-4543.')
# pylint: enable=wrong-import-order, wrong-import-position
__all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
class ReadFromDatastore(PTransform):
"""A ``PTransform`` for reading from Google Cloud Datastore.
To read a ``PCollection[Entity]`` from a Cloud Datastore ``Query``, use
``ReadFromDatastore`` transform by providing a `project` id and a `query` to
read from. You can optionally provide a `namespace` and/or specify how many
splits you want for the query through `num_splits` option.
Note: Normally, a runner will read from Cloud Datastore in parallel across
many workers. However, when the `query` is configured with a `limit` or if the
query contains inequality filters like `GREATER_THAN, LESS_THAN` etc., then
all the returned results will be read by a single worker in order to ensure
correct data. Since data is read from a single worker, this could have
significant impact on the performance of the job.
The semantics for the query splitting is defined below:
1. If `num_splits` is equal to 0, then the number of splits will be chosen
dynamically at runtime based on the query data size.
2. Any value of `num_splits` greater than
`ReadFromDatastore._NUM_QUERY_SPLITS_MAX` will be capped at that value.
3. If the `query` has a user limit set, or contains inequality filters, then
`num_splits` will be ignored and no split will be performed.
4. Under certain cases Cloud Datastore is unable to split query to the
requested number of splits. In such cases we just use whatever the Cloud
Datastore returns.
See https://developers.google.com/datastore/ for more details on Google Cloud
Datastore.
"""
# An upper bound on the number of splits for a query.
_NUM_QUERY_SPLITS_MAX = 50000
# A lower bound on the number of splits for a query. This is to ensure that
# we parellelize the query even when Datastore statistics are not available.
_NUM_QUERY_SPLITS_MIN = 12
# Default bundle size of 64MB.
_DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024
def __init__(self, project, query, namespace=None, num_splits=0):
"""Initialize the `ReadFromDatastore` transform.
Args:
project: The ID of the project to read from.
query: Cloud Datastore query to be read from.
namespace: An optional namespace.
num_splits: Number of splits for the query.
"""
# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
from apitools.base import py # pylint: disable=unused-import
except ImportError:
raise ImportError(
'Google Cloud IO not available, '
'please install apache_beam[gcp]')
_LOGGER.warning('datastoreio read transform is experimental.')
super(ReadFromDatastore, self).__init__()
if not project:
raise ValueError("Project cannot be empty")
if not query:
raise ValueError("Query cannot be empty")
if num_splits < 0:
raise ValueError("num_splits must be greater than or equal 0")
self._project = project
# using _namespace conflicts with DisplayData._namespace
self._datastore_namespace = namespace
self._query = query
self._num_splits = num_splits
def expand(self, pcoll):
# This is a composite transform involves the following:
# 1. Create a singleton of the user provided `query` and apply a ``ParDo``
# that splits the query into `num_splits` and assign each split query a
# unique `int` as the key. The resulting output is of the type
# ``PCollection[(int, Query)]``.
#
# If the value of `num_splits` is less than or equal to 0, then the
# number of splits will be computed dynamically based on the size of the
# data for the `query`.
#
# 2. The resulting ``PCollection`` is sharded using a ``GroupByKey``
# operation. The queries are extracted from the (int, Iterable[Query]) and
# flattened to output a ``PCollection[Query]``.
#
# 3. In the third step, a ``ParDo`` reads entities for each query and
# outputs a ``PCollection[Entity]``.
queries = (pcoll.pipeline
| 'UserQuery' >> Create([self._query])
| 'SplitQuery' >> ParDo(ReadFromDatastore.SplitQueryFn(
self._project, self._query, self._datastore_namespace,
self._num_splits)))
sharded_queries = (queries
| GroupByKey()
| Values()
| 'Flatten' >> FlatMap(lambda x: x))
entities = sharded_queries | 'Read' >> ParDo(
ReadFromDatastore.ReadFn(self._project, self._datastore_namespace))
return entities
def display_data(self):
disp_data = {'project': self._project,
'query': str(self._query),
'num_splits': self._num_splits}
if self._datastore_namespace is not None:
disp_data['namespace'] = self._datastore_namespace
return disp_data
class SplitQueryFn(DoFn):
"""A `DoFn` that splits a given query into multiple sub-queries."""
def __init__(self, project, query, namespace, num_splits):
super(ReadFromDatastore.SplitQueryFn, self).__init__()
self._datastore = None
self._project = project
self._datastore_namespace = namespace
self._query = query
self._num_splits = num_splits
def start_bundle(self):
self._datastore = helper.get_datastore(self._project)
def process(self, query, *args, **kwargs):
# distinct key to be used to group query splits.
key = 1
# If query has a user set limit, then the query cannot be split.
if query.HasField('limit'):
return [(key, query)]
# Compute the estimated numSplits if not specified by the user.
if self._num_splits == 0:
estimated_num_splits = ReadFromDatastore.get_estimated_num_splits(
self._project, self._datastore_namespace, self._query,
self._datastore)
else:
estimated_num_splits = self._num_splits
_LOGGER.info("Splitting the query into %d splits", estimated_num_splits)
try:
query_splits = query_splitter.get_splits(
self._datastore, query, estimated_num_splits,
helper.make_partition(self._project, self._datastore_namespace))
except Exception:
_LOGGER.warning("Unable to parallelize the given query: %s", query,
exc_info=True)
query_splits = [query]
sharded_query_splits = []
for split_query in query_splits:
sharded_query_splits.append((key, split_query))
key += 1
return sharded_query_splits
def display_data(self):
disp_data = {'project': self._project,
'query': str(self._query),
'num_splits': self._num_splits}
if self._datastore_namespace is not None:
disp_data['namespace'] = self._datastore_namespace
return disp_data
class ReadFn(DoFn):
"""A DoFn that reads entities from Cloud Datastore, for a given query."""
def __init__(self, project, namespace=None):
super(ReadFromDatastore.ReadFn, self).__init__()
self._project = project
self._datastore_namespace = namespace
self._datastore = None
def start_bundle(self):
self._datastore = helper.get_datastore(self._project)
def process(self, query, *args, **kwargs):
# Returns an iterator of entities that reads in batches.
entities = helper.fetch_entities(self._project, self._datastore_namespace,
query, self._datastore)
return entities
def display_data(self):
disp_data = {'project': self._project}
if self._datastore_namespace is not None:
disp_data['namespace'] = self._datastore_namespace
return disp_data
@staticmethod
def query_latest_statistics_timestamp(project, namespace, datastore):
"""Fetches the latest timestamp of statistics from Cloud Datastore.
Cloud Datastore system tables with statistics are periodically updated.
This method fethes the latest timestamp (in microseconds) of statistics
update using the `__Stat_Total__` table.
"""
query = helper.make_latest_timestamp_query(namespace)
req = helper.make_request(project, namespace, query)
resp = datastore.run_query(req)
if not resp.batch.entity_results:
raise RuntimeError("Datastore total statistics unavailable.")
entity = resp.batch.entity_results[0].entity
return datastore_helper.micros_from_timestamp(
entity.properties['timestamp'].timestamp_value)
@staticmethod
def get_estimated_size_bytes(project, namespace, query, datastore):
"""Get the estimated size of the data returned by the given query.
Cloud Datastore provides no way to get a good estimate of how large the
result of a query is going to be. Hence we use the __Stat_Kind__ system
table to get size of the entire kind as an approximate estimate, assuming
exactly 1 kind is specified in the query.
See https://cloud.google.com/datastore/docs/concepts/stats.
"""
kind = query.kind[0].name
latest_timestamp = ReadFromDatastore.query_latest_statistics_timestamp(
project, namespace, datastore)
_LOGGER.info('Latest stats timestamp for kind %s is %s',
kind, latest_timestamp)
kind_stats_query = (
helper.make_kind_stats_query(namespace, kind, latest_timestamp))
req = helper.make_request(project, namespace, kind_stats_query)
resp = datastore.run_query(req)
if not resp.batch.entity_results:
raise RuntimeError("Datastore statistics for kind %s unavailable" % kind)
entity = resp.batch.entity_results[0].entity
return datastore_helper.get_value(entity.properties['entity_bytes'])
@staticmethod
def get_estimated_num_splits(project, namespace, query, datastore):
"""Computes the number of splits to be performed on the given query."""
try:
estimated_size_bytes = ReadFromDatastore.get_estimated_size_bytes(
project, namespace, query, datastore)
_LOGGER.info('Estimated size bytes for query: %s', estimated_size_bytes)
num_splits = int(min(ReadFromDatastore._NUM_QUERY_SPLITS_MAX, round(
(float(estimated_size_bytes) /
ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES))))
except Exception as e:
_LOGGER.warning('Failed to fetch estimated size bytes: %s', e)
# Fallback in case estimated size is unavailable.
num_splits = ReadFromDatastore._NUM_QUERY_SPLITS_MIN
return max(num_splits, ReadFromDatastore._NUM_QUERY_SPLITS_MIN)
class _Mutate(PTransform):
"""A ``PTransform`` that writes mutations to Cloud Datastore.
Only idempotent Datastore mutation operations (upsert and delete) are
supported, as the commits are retried when failures occur.
"""
def __init__(self, project, mutation_fn):
"""Initializes a Mutate transform.
Args:
project: The Project ID
mutation_fn: A function that converts `entities` or `keys` to
`mutations`.
"""
self._project = project
self._mutation_fn = mutation_fn
_LOGGER.warning('datastoreio write transform is experimental.')
def expand(self, pcoll):
return (pcoll
| 'Convert to Mutation' >> Map(self._mutation_fn)
| 'Write Mutation to Datastore' >> ParDo(_Mutate.DatastoreWriteFn(
self._project)))
def display_data(self):
return {'project': self._project,
'mutation_fn': self._mutation_fn.__class__.__name__}
class DatastoreWriteFn(DoFn):
"""A ``DoFn`` that write mutations to Datastore.
Mutations are written in batches, where the maximum batch size is
`util.WRITE_BATCH_SIZE`.
Commits are non-transactional. If a commit fails because of a conflict over
an entity group, the commit will be retried. This means that the mutation
should be idempotent (`upsert` and `delete` mutations) to prevent duplicate
data or errors.
"""
def __init__(self, project, fixed_batch_size=None):
"""
Args:
project: str, the cloud project id.
fixed_batch_size: int, for testing only, this forces all batches of
writes to be a fixed size, for easier unittesting.
"""
self._project = project
self._datastore = None
self._fixed_batch_size = fixed_batch_size
self._rpc_successes = Metrics.counter(
_Mutate.DatastoreWriteFn, "datastoreRpcSuccesses")
self._rpc_errors = Metrics.counter(
_Mutate.DatastoreWriteFn, "datastoreRpcErrors")
self._throttled_secs = Metrics.counter(
_Mutate.DatastoreWriteFn, "cumulativeThrottlingSeconds")
self._throttler = AdaptiveThrottler(window_ms=120000, bucket_ms=1000,
overload_ratio=1.25)
def _update_rpc_stats(self, successes=0, errors=0, throttled_secs=0):
self._rpc_successes.inc(successes)
self._rpc_errors.inc(errors)
self._throttled_secs.inc(throttled_secs)
def start_bundle(self):
self._mutations = []
self._mutations_size = 0
self._datastore = helper.get_datastore(self._project)
if self._fixed_batch_size:
self._target_batch_size = self._fixed_batch_size
else:
self._batch_sizer = util.DynamicBatchSizer()
self._target_batch_size = self._batch_sizer.get_batch_size(
time.time()*1000)
def process(self, element):
size = element.ByteSize()
if (self._mutations and
size + self._mutations_size > util.WRITE_BATCH_MAX_BYTES_SIZE):
self._flush_batch()
self._mutations.append(element)
self._mutations_size += size
if len(self._mutations) >= self._target_batch_size:
self._flush_batch()
def finish_bundle(self):
if self._mutations:
self._flush_batch()
def _flush_batch(self):
# Flush the current batch of mutations to Cloud Datastore.
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=util.WRITE_BATCH_TARGET_LATENCY_MS//1000)
_LOGGER.debug("Successfully wrote %d mutations in %dms.",
len(self._mutations), latency_ms)
if not self._fixed_batch_size:
now = time.time()*1000
self._batch_sizer.report_latency(now, latency_ms, len(self._mutations))
self._target_batch_size = self._batch_sizer.get_batch_size(now)
self._mutations = []
self._mutations_size = 0
class WriteToDatastore(_Mutate):
"""A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
def __init__(self, project):
"""Initialize the `WriteToDatastore` transform.
Args:
project: The ID of the project to write to.
"""
# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
from apitools.base import py # pylint: disable=unused-import
except ImportError:
raise ImportError(
'Google Cloud IO not available, '
'please install apache_beam[gcp]')
super(WriteToDatastore, self).__init__(
project, WriteToDatastore.to_upsert_mutation)
@staticmethod
def to_upsert_mutation(entity):
if not helper.is_key_valid(entity.key):
raise ValueError('Entities to be written to the Cloud Datastore must '
'have complete keys:\n%s' % entity)
mutation = datastore_pb2.Mutation()
mutation.upsert.CopyFrom(entity)
return mutation
class DeleteFromDatastore(_Mutate):
"""A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore."""
def __init__(self, project):
"""Initialize the `DeleteFromDatastore` transform.
Args:
project: The ID of the project from which the entities will be deleted.
"""
super(DeleteFromDatastore, self).__init__(
project, DeleteFromDatastore.to_delete_mutation)
@staticmethod
def to_delete_mutation(key):
if not helper.is_key_valid(key):
raise ValueError('Keys to be deleted from the Cloud Datastore must be '
'complete:\n%s', key)
mutation = datastore_pb2.Mutation()
mutation.delete.CopyFrom(key)
return mutation