PHOENIX-6410 Expose primary keys metadata via SqlAlchemy
diff --git a/python-phoenixdb/README.rst b/python-phoenixdb/README.rst
index a7a32f6..2f1a562 100644
--- a/python-phoenixdb/README.rst
+++ b/python-phoenixdb/README.rst
@@ -58,14 +58,14 @@
python setup.py develop
You can start a Phoenix QueryServer instance on http://localhost:8765 for testing by running
-the following command in the phoenix-queryserver directory::
+the following command in the pohoenix-queryserver-parent directory::
mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
-Dit.test=QueryServerBasicsIT\#startLocalPQS \
-Ddo.not.randomize.pqs.port=true -Dstart.unsecure.pqs=true
You can start a secure (https+kerberos) Phoenix QueryServer instance on https://localhost:8765
-for testing by running the following command in the phoenix-queryserver directory::
+for testing by running the following command in the phoenix-queryserver-parent directory::
mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
-Dit.test=SecureQueryServerPhoenixDBIT\#startLocalPQS \
@@ -77,7 +77,7 @@
If you want to use the library without installing the phoenixdb library, you can use
the `PYTHONPATH` environment variable to point to the library directly::
- cd $PHOENIX_HOME/python
+ cd phoenix-queryserver-parent/python-phoenixdb
python setup.py build
cd ~/my_project
PYTHONPATH=$PHOENIX_HOME/build/lib python my_app.py
@@ -110,8 +110,8 @@
pyenv global 2.7.14 3.5.5 3.6.4
PHOENIXDB_TEST_DB_URL='http://localhost:8765' tox
-You can use tox and docker to run the tests on all supported python versions without installing the
-environments locally::
+You can use tox and docker to run the tests on supported python versions up to 3.8 without
+installing the environments locally::
docker build -t toxtest .
docker run --rm -v `pwd`:/src toxtest
diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index d6f42fe..48b6406 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -310,6 +310,16 @@
response.ParseFromString(response_data)
return response
+ def get_sync_results(self, connection_id, statement_id, state):
+ request = requests_pb2.SyncResultsRequest()
+ request.connection_id = connection_id
+ request.statement_id = statement_id
+ request.state.CopyFrom(state)
+ response_data = self._apply(request, 'SyncResultsResponse')
+ syncResultResponse = responses_pb2.SyncResultsResponse()
+ syncResultResponse.ParseFromString(response_data)
+ return syncResultResponse
+
def connection_sync_dict(self, connection_id, connProps=None):
conn_props = self.connection_sync(connection_id, connProps)
return {
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index ad09106..e716115 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -246,6 +246,21 @@
self._connection._id, self._id,
[self._transform_parameters(p) for p in seq_of_parameters])
+ def get_sync_results(self, state):
+ if self._closed:
+ raise ProgrammingError('The cursor is already closed.')
+ if self._id is None:
+ self._set_id(self._connection._client.create_statement(self._connection._id))
+ return self._connection._client.get_sync_results(self._connection._id, self._id, state)
+
+ def fetch(self, signature):
+ if self._closed:
+ raise ProgrammingError('The cursor is already closed.')
+ self._updatecount = -1
+ self._set_signature(signature)
+ frame = self._connection._client.fetch(self._connection._id, self._id, 0, self.itersize)
+ self._set_frame(frame)
+
def _transform_row(self, row):
"""Transforms a Row into Python values.
diff --git a/python-phoenixdb/phoenixdb/meta.py b/python-phoenixdb/phoenixdb/meta.py
index 18ad147..d5987bb 100644
--- a/python-phoenixdb/phoenixdb/meta.py
+++ b/python-phoenixdb/phoenixdb/meta.py
@@ -16,6 +16,7 @@
import sys
import logging
+from phoenixdb.avatica.proto import common_pb2
from phoenixdb.errors import ProgrammingError
from phoenixdb.cursor import DictCursor
@@ -83,6 +84,119 @@
cursor._process_result(result)
return cursor.fetchall()
+ def get_primary_keys(self, catalog=None, schema=None, table=None):
+ if self._connection._closed:
+ raise ProgrammingError('The cursor is already closed.')
+
+ state = common_pb2.QueryState()
+ state.type = common_pb2.StateType.METADATA
+ state.op = common_pb2.MetaDataOperation.GET_PRIMARY_KEYS
+ state.has_args = True
+ state.has_op = True
+
+ catalog_arg = self._moa_string_arg_factory(catalog)
+ schema_arg = self._moa_string_arg_factory(schema)
+ table_arg = self._moa_string_arg_factory(table)
+ state.args.extend([catalog_arg, schema_arg, table_arg])
+
+ with DictCursor(self._connection) as cursor:
+ syncResultResponse = cursor.get_sync_results(state)
+ if not syncResultResponse.more_results:
+ return []
+
+ signature = common_pb2.Signature()
+ signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12))
+ signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12))
+ signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(4, 'COLUMN_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(5, 'KEY_SEQ', 5))
+ signature.columns.append(self._column_meta_data_factory(6, 'PK_NAME', 12))
+ # The following are non-standard Phoenix extensions
+ # This returns '\x00\x00\x00A' or '\x00\x00\x00D' , but that's consistent with Java
+ signature.columns.append(self._column_meta_data_factory(7, 'ASC_OR_DESC', 12))
+ signature.columns.append(self._column_meta_data_factory(8, 'DATA_TYPE', 5))
+ signature.columns.append(self._column_meta_data_factory(9, 'TYPE_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(10, 'COLUMN_SIZE', 5))
+ signature.columns.append(self._column_meta_data_factory(11, 'TYPE_ID', 5))
+ signature.columns.append(self._column_meta_data_factory(12, 'VIEW_CONSTANT', 12))
+
+ cursor.fetch(signature)
+ return cursor.fetchall()
+
+ def get_index_info(self, catalog=None, schema=None, table=None, unique=False, approximate=False):
+ if self._connection._closed:
+ raise ProgrammingError('The cursor is already closed.')
+
+ state = common_pb2.QueryState()
+ state.type = common_pb2.StateType.METADATA
+ state.op = common_pb2.MetaDataOperation.GET_INDEX_INFO
+ state.has_args = True
+ state.has_op = True
+
+ catalog_arg = self._moa_string_arg_factory(catalog)
+ schema_arg = self._moa_string_arg_factory(schema)
+ table_arg = self._moa_string_arg_factory(table)
+ unique_arg = self._moa_bool_arg_factory(unique)
+ approximate_arg = self._moa_bool_arg_factory(approximate)
+
+ state.args.extend([catalog_arg, schema_arg, table_arg, unique_arg, approximate_arg])
+
+ with DictCursor(self._connection) as cursor:
+ syncResultResponse = cursor.get_sync_results(state)
+ if not syncResultResponse.more_results:
+ return []
+
+ signature = common_pb2.Signature()
+ signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12))
+ signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12))
+ signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(4, 'NON_UNIQUE', 16))
+ signature.columns.append(self._column_meta_data_factory(5, 'INDEX_QUALIFIER', 12))
+ signature.columns.append(self._column_meta_data_factory(6, 'INDEX_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(7, 'TYPE', 5))
+ signature.columns.append(self._column_meta_data_factory(8, 'ORDINAL_POSITION', 5))
+ signature.columns.append(self._column_meta_data_factory(9, 'COLUMN_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(10, 'ASC_OR_DESC', 12))
+ signature.columns.append(self._column_meta_data_factory(11, 'CARDINALITY', 5))
+ signature.columns.append(self._column_meta_data_factory(12, 'PAGES', 5))
+ signature.columns.append(self._column_meta_data_factory(13, 'FILTER_CONDITION', 12))
+ # The following are non-standard Phoenix extensions
+ signature.columns.append(self._column_meta_data_factory(14, 'DATA_TYPE', 5))
+ signature.columns.append(self._column_meta_data_factory(15, 'TYPE_NAME', 12))
+ signature.columns.append(self._column_meta_data_factory(16, 'TYPE_ID', 5))
+ signature.columns.append(self._column_meta_data_factory(17, 'COLUMN_FAMILY', 12))
+ signature.columns.append(self._column_meta_data_factory(18, 'COLUMN_SIZE', 5))
+ signature.columns.append(self._column_meta_data_factory(19, 'ARRAY_SIZE', 5))
+
+ cursor.fetch(signature)
+ return cursor.fetchall()
+
+ def _column_meta_data_factory(self, ordinal, column_name, jdbc_code):
+ cmd = common_pb2.ColumnMetaData()
+ cmd.ordinal = ordinal
+ cmd.column_name = column_name
+ cmd.type.id = jdbc_code
+ cmd.nullable = 2
+ return cmd
+
+ def _moa_string_arg_factory(self, arg):
+ moa = common_pb2.MetaDataOperationArgument()
+ if arg is None:
+ moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL
+ else:
+ moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.STRING
+ moa.string_value = arg
+ return moa
+
+ def _moa_bool_arg_factory(self, arg):
+ moa = common_pb2.MetaDataOperationArgument()
+ if arg is None:
+ moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL
+ else:
+ moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.BOOL
+ moa.bool_value = arg
+ return moa
+
def _fix_default(self, rows, catalog=None, schemaPattern=None):
'''Workaround for PHOENIX-6003'''
if schemaPattern == '':
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index b402322..8661417 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -166,19 +166,45 @@
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
if schema is None:
schema = ''
- columns = connection.connect().connection.meta().get_columns(
- schemaPattern=schema, tableNamePattern=table_name, *kw)
- pk_columns = [col['COLUMN_NAME'] for col in columns if col['KEY_SEQ'] > 0]
- return {'constrained_columns': pk_columns}
+ raw = connection.connect().connection.meta().get_primary_keys(
+ schema=schema, table=table_name)
+ cooked = {
+ 'constrained_columns': []
+ }
+ if raw:
+ cooked['name'] = raw[0]['PK_NAME']
+ for row in raw:
+ cooked['constrained_columns'].insert(row['KEY_SEQ'] - 1, row['COLUMN_NAME'])
+ return cooked
- def get_indexes(self, conn, table_name, schema=None, **kw):
- '''This information does not seem to be exposed via Avatica
- TODO: Implement by directly querying SYSTEM tables ? '''
- return []
+ def get_indexes(self, connection, table_name, schema=None, **kw):
+ if schema is None:
+ schema = ''
+ raw = connection.connect().connection.meta().get_index_info(schema=schema, table=table_name)
+ # We know that Phoenix returns the rows ordered by INDEX_NAME and ORDINAL_POSITION
+ cooked = []
+ current = None
+ for row in raw:
+ if current is None or row['INDEX_NAME'] != current['name']:
+ current = {
+ 'name': row['INDEX_NAME'],
+ 'unique': not row['NON_UNIQUE'] is False,
+ 'column_names': [],
+ }
+ cooked.append(current)
+ # Phoenix returns the column names in its internal representation here
+ # Remove the default CF prefix
+ canonical_name = row['INDEX_NAME']
+ if canonical_name.startswith('0:'):
+ canonical_name = canonical_name[len(':0')]
+ if canonical_name.startswith(':'):
+ canonical_name = canonical_name[len(':')]
+ current['column_names'].append(canonical_name)
+ return cooked
def get_foreign_keys(self, conn, table_name, schema=None, **kw):
'''Foreign keys are a foreign concept to Phoenix,
- but SqlAlchemy cannot parse the DB schema if it's not implemented '''
+ and SqlAlchemy cannot parse the DB schema if it's not implemented '''
return []
def _map_column(self, raw):
diff --git a/python-phoenixdb/phoenixdb/tests/test_db.py b/python-phoenixdb/phoenixdb/tests/test_db.py
index da12b23..0c04b11 100644
--- a/python-phoenixdb/phoenixdb/tests/test_db.py
+++ b/python-phoenixdb/phoenixdb/tests/test_db.py
@@ -17,7 +17,7 @@
import phoenixdb.cursor
from phoenixdb.connection import Connection
-from phoenixdb.errors import InternalError
+from phoenixdb.errors import InternalError, ProgrammingError
from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL
@@ -172,6 +172,65 @@
{'TABLE_TYPE': 'VIEW'}]))
self.assertEqual(meta.get_type_info(), [])
+
+ finally:
+ cursor.execute('drop table if exists DEFAULT_TABLE')
+ cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+ cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
+
+ def test_meta2(self):
+ with self.conn.cursor() as cursor:
+ try:
+ cursor.execute('drop table if exists DEFAULT_TABLE')
+ cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+ cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
+
+ cursor.execute('''create table DEFAULT_TABLE (ID integer not null, ID2 varchar not null,
+ V1 integer, V2 varchar, constraint PK PRIMARY KEY (ID DESC, ID2 ASC))''')
+ cursor.execute('CREATE INDEX GLOBAL_IDX ON DEFAULT_TABLE (V1) INCLUDE (V2)')
+ cursor.execute('CREATE LOCAL INDEX LOCAL_IDX ON DEFAULT_TABLE (V1)')
+ cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
+ cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
+
+ meta = self.conn.meta()
+ self.assertTrue(len(meta.get_primary_keys(table='DEFAULT_TABLE')),
+ [{'ASC_OR_DESC': '\x00\x00\x00D',
+ 'COLUMN_NAME': 'ID',
+ 'COLUMN_SIZE': None,
+ 'DATA_TYPE': 4,
+ 'KEY_SEQ': 1,
+ 'PK_NAME': 'PK',
+ 'TABLE_CAT': None,
+ 'TABLE_NAME': 'DEFAULT_TABLE',
+ 'TABLE_SCHEM': None,
+ 'TYPE_ID': 4,
+ 'TYPE_NAME': 'INTEGER',
+ 'VIEW_CONSTANT': None},
+ {'ASC_OR_DESC': '\x00\x00\x00A',
+ 'COLUMN_NAME': 'ID2',
+ 'COLUMN_SIZE': None,
+ 'DATA_TYPE': 12,
+ 'KEY_SEQ': 2,
+ 'PK_NAME': 'PK',
+ 'TABLE_CAT': None,
+ 'TABLE_NAME': 'DEFAULT_TABLE',
+ 'TABLE_SCHEM': None,
+ 'TYPE_ID': 12,
+ 'TYPE_NAME': 'VARCHAR',
+ 'VIEW_CONSTANT': None}])
+ self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='A_TABLE')), 1)
+ try:
+ self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='B_TABLE')), 0)
+ self.assertTrue(False)
+ except ProgrammingError:
+ pass
+
+ self.maxDiff = None
+
+ self.assertEqual(meta.get_index_info(table='NON_EXISTENT'), [])
+
+ self.assertTrue(len(meta.get_index_info(table='DEFAULT_TABLE')) > 1)
+
finally:
cursor.execute('drop table if exists DEFAULT_TABLE')
cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
diff --git a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
index 37ed5a0..c996262 100644
--- a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
+++ b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
@@ -18,6 +18,7 @@
import sqlalchemy as db
from sqlalchemy import text
+from sqlalchemy.types import BIGINT, CHAR, VARCHAR
from . import TEST_DB_AUTHENTICATION, TEST_DB_AVATICA_PASSWORD, TEST_DB_AVATICA_USER, \
TEST_DB_TRUSTSTORE, TEST_DB_URL
@@ -103,8 +104,27 @@
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city))'''))
+ connection.execute('CREATE INDEX GLOBAL_IDX ON US_POPULATION (state) INCLUDE (city)')
+ connection.execute('CREATE LOCAL INDEX LOCAL_IDX ON US_POPULATION (population)')
+
columns_result = inspector.get_columns('US_POPULATION')
- self.assertEqual(len(columns_result), 3)
+ # The list is not equal to its represenatation
+ self.assertTrue(str(columns_result),
+ str([{'name': 'STATE', 'type': CHAR(), 'nullable': True,
+ 'autoincrement': False, 'comment': '', 'default': None},
+ {'name': 'CITY', 'type': VARCHAR(), 'nullable': True,
+ 'autoincrement': False, 'comment': '', 'default': None},
+ {'name': 'POPULATION', 'type': BIGINT(), 'nullable': True,
+ 'autoincrement': False, 'comment': '', 'default': None}]))
+
+ indexes_result = inspector.get_indexes('US_POPULATION')
+ self.assertTrue(indexes_result,
+ [{'name': 'GLOBAL_IDX', 'unique': False, 'column_names': ['STATE', 'CITY']},
+ {'name': 'LOCAL_IDX', 'unique': False, 'column_names': ['_INDEX_ID', 'POPULATION', 'STATE', 'CITY']}])
+
+ pk_result = inspector.get_pk_constraint('US_POPULATION')
+ self.assertTrue(pk_result, {'constrained_columns': ['STATE', 'CITY'], 'name': 'MY_PK'})
+
finally:
connection.execute('drop table if exists us_population')