| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| Beam Datastore types. |
| |
| This module is experimental, no backwards compatibility guarantees. |
| """ |
| |
| from __future__ import absolute_import |
| |
| import copy |
| |
| from google.cloud.datastore import entity |
| from google.cloud.datastore import key |
| from google.cloud.datastore import query |
| |
| from apache_beam.options.value_provider import ValueProvider |
| |
| __all__ = ['Query', 'Key', 'Entity'] |
| |
| |
| class Query(object): |
| def __init__(self, kind=None, project=None, namespace=None, ancestor=None, |
| filters=(), projection=(), order=(), distinct_on=(), limit=None): |
| """Represents a Datastore query. |
| |
| Args: |
| kind: (str) The kind to query. |
| project: (str) Required. Project associated with query. |
| namespace: (str) (Optional) Namespace to restrict results to. |
| ancestor: (:class:`~apache_beam.io.gcp.datastore.v1new.types.Key`) |
| (Optional) key of the ancestor to which this query's results are |
| restricted. |
| filters: (sequence of tuple[str, str, str], |
| sequence of |
| tuple[ValueProvider(str), ValueProvider(str), ValueProvider(str)]) |
| Property filters applied by this query. |
| The sequence is ``(property_name, operator, value)``. |
| projection: (sequence of string) fields returned as part of query results. |
| order: (sequence of string) field names used to order query results. |
| Prepend ``-`` to a field name to sort it in descending order. |
| distinct_on: (sequence of string) field names used to group query |
| results. |
| limit: (int) Maximum amount of results to return. |
| """ |
| self.kind = kind |
| self.project = project |
| self.namespace = namespace |
| self.ancestor = ancestor |
| self.filters = filters or () |
| self.projection = projection |
| self.order = order |
| self.distinct_on = distinct_on |
| self.limit = limit |
| |
| def _to_client_query(self, client): |
| """ |
| Returns a ``google.cloud.datastore.query.Query`` instance that represents |
| this query. |
| |
| Args: |
| client: (``google.cloud.datastore.client.Client``) Datastore client |
| instance to use. |
| """ |
| ancestor_client_key = None |
| if self.ancestor is not None: |
| ancestor_client_key = self.ancestor.to_client_key() |
| |
| self.filters = self._set_runtime_filters() |
| |
| return query.Query( |
| client, kind=self.kind, project=self.project, namespace=self.namespace, |
| ancestor=ancestor_client_key, filters=self.filters, |
| projection=self.projection, order=self.order, |
| distinct_on=self.distinct_on) |
| |
| def _set_runtime_filters(self): |
| """ |
| Extracts values from ValueProviders in `self.filters` if available |
| :param filters: sequence of tuple[str, str, str] or |
| sequence of tuple[ValueProvider, ValueProvider, ValueProvider] |
| :return: tuple[str, str, str] |
| """ |
| runtime_filters = [] |
| if not all(len(filter_tuple) == 3 for filter_tuple in self.filters): |
| raise TypeError('%s: filters must be a sequence of tuple with length=3' |
| ' got %r instead' |
| % (self.__class__.__name__, self.filters)) |
| |
| for filter_type, filter_operator, filter_value in self.filters: |
| if isinstance(filter_type, ValueProvider): |
| filter_type = filter_type.get() |
| if isinstance(filter_operator, ValueProvider): |
| filter_operator = filter_operator.get() |
| if isinstance(filter_value, ValueProvider): |
| filter_value = filter_value.get() |
| runtime_filters.append((filter_type, filter_operator, filter_value)) |
| |
| return runtime_filters or () |
| |
| def clone(self): |
| return copy.copy(self) |
| |
| def __repr__(self): |
| return ('<Query(kind=%s, project=%s, namespace=%s, ancestor=%s, filters=%s,' |
| 'projection=%s, order=%s, distinct_on=%s, limit=%s)>' % ( |
| self.kind, self.project, self.namespace, self.ancestor, |
| self.filters, self.projection, self.order, self.distinct_on, |
| self.limit)) |
| |
| |
| class Key(object): |
| def __init__(self, path_elements, parent=None, project=None, namespace=None): |
| """ |
| Represents a Datastore key. |
| |
| The partition ID is represented by its components: namespace and project. |
| If key has a parent, project and namespace should either be unset or match |
| the parent's. |
| |
| Args: |
| path_elements: (list of str and int) Key path: an alternating sequence of |
| kind and identifier. The kind must be of type ``str`` and identifier may |
| be a ``str`` or an ``int``. |
| If the last identifier is omitted this is an incomplete key, which is |
| unsupported in ``WriteToDatastore`` and ``DeleteFromDatastore``. |
| See :class:`google.cloud.datastore.key.Key` for more details. |
| parent: (:class:`~apache_beam.io.gcp.datastore.v1new.types.Key`) |
| (optional) Parent for this key. |
| project: (str) Project ID. Required unless set by parent. |
| namespace: (str) (optional) Namespace ID |
| """ |
| # Verification or arguments is delegated to to_client_key(). |
| self.path_elements = tuple(path_elements) |
| self.parent = parent |
| self.namespace = namespace |
| self.project = project |
| |
| @staticmethod |
| def from_client_key(client_key): |
| return Key(client_key.flat_path, project=client_key.project, |
| namespace=client_key.namespace) |
| |
| def to_client_key(self): |
| """ |
| Returns a :class:`google.cloud.datastore.key.Key` instance that represents |
| this key. |
| """ |
| parent = self.parent |
| if parent is not None: |
| parent = parent.to_client_key() |
| return key.Key(*self.path_elements, parent=parent, namespace=self.namespace, |
| project=self.project) |
| |
| def __eq__(self, other): |
| if not isinstance(other, Key): |
| return False |
| if self.path_elements != other.path_elements: |
| return False |
| if self.project != other.project: |
| return False |
| if self.parent is not None and other.parent is not None: |
| return self.parent == other.parent |
| |
| return self.parent is None and other.parent is None |
| |
| __hash__ = None |
| |
| def __repr__(self): |
| return '<%s(%s, parent=%s, project=%s, namespace=%s)>' % ( |
| self.__class__.__name__, str(self.path_elements), str(self.parent), |
| self.project, self.namespace) |
| |
| |
| class Entity(object): |
| def __init__(self, key, exclude_from_indexes=()): |
| """ |
| Represents a Datastore entity. |
| |
| Does not support the property value "meaning" field. |
| |
| Args: |
| key: (Key) A complete Key representing this Entity. |
| exclude_from_indexes: (iterable of str) List of property keys whose values |
| should not be indexed for this entity. |
| """ |
| self.key = key |
| self.exclude_from_indexes = set(exclude_from_indexes) |
| self.properties = {} |
| |
| def set_properties(self, property_dict): |
| """Sets a dictionary of properties on this entity. |
| |
| Args: |
| property_dict: A map from property name to value. See |
| :class:`google.cloud.datastore.entity.Entity` documentation for allowed |
| values. |
| """ |
| self.properties.update(property_dict) |
| |
| @staticmethod |
| def from_client_entity(client_entity): |
| res = Entity( |
| Key.from_client_key(client_entity.key), |
| exclude_from_indexes=set(client_entity.exclude_from_indexes)) |
| for name, value in client_entity.items(): |
| if isinstance(value, key.Key): |
| value = Key.from_client_key(value) |
| res.properties[name] = value |
| return res |
| |
| def to_client_entity(self): |
| """ |
| Returns a :class:`google.cloud.datastore.entity.Entity` instance that |
| represents this entity. |
| """ |
| res = entity.Entity(key=self.key.to_client_key(), |
| exclude_from_indexes=tuple(self.exclude_from_indexes)) |
| for name, value in self.properties.items(): |
| if isinstance(value, Key): |
| if not value.project: |
| value.project = self.key.project |
| value = value.to_client_key() |
| res[name] = value |
| return res |
| |
| def __eq__(self, other): |
| if not isinstance(other, Entity): |
| return False |
| return (self.key == other.key and |
| self.exclude_from_indexes == other.exclude_from_indexes and |
| self.properties == other.properties) |
| |
| __hash__ = None |
| |
| def __repr__(self): |
| return "<%s(key=%s, exclude_from_indexes=%s) properties=%s>" % ( |
| self.__class__.__name__, str(self.key), |
| str(self.exclude_from_indexes), str(self.properties)) |