| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import uuid |
| import weakref |
| |
| from phoenixdb import errors |
| from phoenixdb.cursor import Cursor |
| from phoenixdb.errors import ProgrammingError |
| from phoenixdb.meta import Meta |
| |
| __all__ = ['Connection'] |
| |
| logger = logging.getLogger(__name__) |
| |
| AVATICA_PROPERTIES = ('autoCommit', 'autocommit', 'readOnly', 'readonly', 'transactionIsolation', |
| 'catalog', 'schema') |
| |
| |
| class Connection(object): |
| """Database connection. |
| |
| You should not construct this object manually, use :func:`~phoenixdb.connect` instead. |
| """ |
| |
| cursor_factory = None |
| """ |
| The default cursor factory used by :meth:`cursor` if the parameter is not specified. |
| """ |
| |
| def __init__(self, client, cursor_factory=None, **kwargs): |
| self._client = client |
| self._closed = False |
| if cursor_factory is not None: |
| self.cursor_factory = cursor_factory |
| else: |
| self.cursor_factory = Cursor |
| self._cursors = [] |
| self._phoenix_props, avatica_props_init = Connection._map_conn_props(kwargs) |
| self.open() |
| |
| # TODO we could probably optimize it away if the defaults are not changed |
| self.set_session(**avatica_props_init) |
| |
| def __del__(self): |
| if not self._closed: |
| self.close() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, traceback): |
| if not self._closed: |
| self.close() |
| |
| @staticmethod |
| def _default_avatica_props(): |
| return {'autoCommit': False, |
| 'readOnly': False, |
| 'transactionIsolation': 0, |
| 'catalog': '', |
| 'schema': ''} |
| |
| @staticmethod |
| def _map_conn_props(conn_props): |
| """Sorts and prepocesses args that should be passed to Phoenix and Avatica""" |
| |
| avatica_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k in AVATICA_PROPERTIES]) |
| phoenix_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k not in AVATICA_PROPERTIES]) |
| avatica_props = Connection._map_legacy_avatica_props(avatica_props) |
| |
| return (phoenix_props, avatica_props) |
| |
| @staticmethod |
| def _map_legacy_avatica_props(props): |
| if 'autocommit' in props: |
| props['autoCommit'] = bool(props.pop('autocommit')) |
| if 'readonly' in props: |
| props['readOnly'] = bool(props.pop('readonly')) |
| return props |
| |
| def open(self): |
| """Opens the connection.""" |
| self._id = str(uuid.uuid4()) |
| self._client.open_connection(self._id, info=self._phoenix_props) |
| |
| def close(self): |
| """Closes the connection. |
| No further operations are allowed, either on the connection or any |
| of its cursors, once the connection is closed. |
| |
| If the connection is used in a ``with`` statement, this method will |
| be automatically called at the end of the ``with`` block. |
| """ |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| for cursor_ref in self._cursors: |
| cursor = cursor_ref() |
| if cursor is not None and not cursor._closed: |
| cursor.close() |
| self._client.close_connection(self._id) |
| self._client.close() |
| self._closed = True |
| |
| @property |
| def closed(self): |
| """Read-only attribute specifying if the connection is closed or not.""" |
| return self._closed |
| |
| def commit(self): |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| self._client.commit(self._id) |
| |
| def rollback(self): |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| self._client.rollback(self._id) |
| |
| def cursor(self, cursor_factory=None): |
| """Creates a new cursor. |
| |
| :param cursor_factory: |
| This argument can be used to create non-standard cursors. |
| The class returned must be a subclass of |
| :class:`~phoenixdb.cursor.Cursor` (for example :class:`~phoenixdb.cursor.DictCursor`). |
| A default factory for the connection can also be specified using the |
| :attr:`cursor_factory` attribute. |
| |
| :returns: |
| A :class:`~phoenixdb.cursor.Cursor` object. |
| """ |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| cursor = (cursor_factory or self.cursor_factory)(self) |
| self._cursors.append(weakref.ref(cursor, self._cursors.remove)) |
| return cursor |
| |
| def set_session(self, **props): |
| """Sets one or more parameters in the current connection. |
| |
| :param autocommit: |
| Switch the connection to autocommit mode. |
| |
| :param readonly: |
| Switch the connection to read-only mode. |
| """ |
| props = Connection._map_legacy_avatica_props(props) |
| self._avatica_props = self._client.connection_sync_dict(self._id, props) |
| |
| @property |
| def autocommit(self): |
| """Read/write attribute for switching the connection's autocommit mode.""" |
| return self._avatica_props['autoCommit'] |
| |
| @autocommit.setter |
| def autocommit(self, value): |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| self._avatica_props = self._client.connection_sync_dict(self._id, {'autoCommit': bool(value)}) |
| |
| @property |
| def readonly(self): |
| """Read/write attribute for switching the connection's readonly mode.""" |
| return self._avatica_props['readOnly'] |
| |
| @readonly.setter |
| def readonly(self, value): |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| self._avatica_props = self._client.connection_sync_dict(self._id, {'readOnly': bool(value)}) |
| |
| @property |
| def transactionisolation(self): |
| return self._avatica_props['_transactionIsolation'] |
| |
| @transactionisolation.setter |
| def transactionisolation(self, value): |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| self._avatica_props = self._client.connection_sync_dict(self._id, {'transactionIsolation': bool(value)}) |
| |
| def meta(self): |
| """Creates a new meta. |
| |
| :returns: |
| A :class:`~phoenixdb.meta` object. |
| """ |
| if self._closed: |
| raise ProgrammingError('The connection is already closed.') |
| meta = Meta(self) |
| return meta |
| |
| |
| for name in errors.__all__: |
| setattr(Connection, name, getattr(errors, name)) |