PHOENIX-6410 Expose primary keys metadata via SqlAlchemy
diff --git a/python-phoenixdb/README.rst b/python-phoenixdb/README.rst
index a7a32f6..2f1a562 100644
--- a/python-phoenixdb/README.rst
+++ b/python-phoenixdb/README.rst
@@ -58,14 +58,14 @@
     python setup.py develop
 
 You can start a Phoenix QueryServer instance on http://localhost:8765 for testing by running
-the following command in the phoenix-queryserver directory::
+the following command in the pohoenix-queryserver-parent directory::
 
     mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
     -Dit.test=QueryServerBasicsIT\#startLocalPQS \
     -Ddo.not.randomize.pqs.port=true -Dstart.unsecure.pqs=true
 
 You can start a secure (https+kerberos) Phoenix QueryServer instance on https://localhost:8765
-for testing by running the following command in the phoenix-queryserver directory::
+for testing by running the following command in the phoenix-queryserver-parent directory::
 
     mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
     -Dit.test=SecureQueryServerPhoenixDBIT\#startLocalPQS \
@@ -77,7 +77,7 @@
 If you want to use the library without installing the phoenixdb library, you can use
 the `PYTHONPATH` environment variable to point to the library directly::
 
-    cd $PHOENIX_HOME/python
+    cd phoenix-queryserver-parent/python-phoenixdb
     python setup.py build
     cd ~/my_project
     PYTHONPATH=$PHOENIX_HOME/build/lib python my_app.py
@@ -110,8 +110,8 @@
     pyenv global 2.7.14 3.5.5 3.6.4
     PHOENIXDB_TEST_DB_URL='http://localhost:8765' tox
 
-You can use tox and docker to run the tests on all supported python versions without installing the
-environments locally::
+You can use tox and docker to run the tests on supported python versions up to 3.8 without
+installing the environments locally::
 
     docker build -t toxtest .
     docker run --rm  -v `pwd`:/src toxtest
diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index d6f42fe..48b6406 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -310,6 +310,16 @@
         response.ParseFromString(response_data)
         return response
 
+    def get_sync_results(self, connection_id, statement_id, state):
+        request = requests_pb2.SyncResultsRequest()
+        request.connection_id = connection_id
+        request.statement_id = statement_id
+        request.state.CopyFrom(state)
+        response_data = self._apply(request, 'SyncResultsResponse')
+        syncResultResponse = responses_pb2.SyncResultsResponse()
+        syncResultResponse.ParseFromString(response_data)
+        return syncResultResponse
+
     def connection_sync_dict(self, connection_id, connProps=None):
         conn_props = self.connection_sync(connection_id, connProps)
         return {
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index ad09106..e716115 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -246,6 +246,21 @@
             self._connection._id, self._id,
             [self._transform_parameters(p) for p in seq_of_parameters])
 
+    def get_sync_results(self, state):
+        if self._closed:
+            raise ProgrammingError('The cursor is already closed.')
+        if self._id is None:
+            self._set_id(self._connection._client.create_statement(self._connection._id))
+        return self._connection._client.get_sync_results(self._connection._id, self._id, state)
+
+    def fetch(self, signature):
+        if self._closed:
+            raise ProgrammingError('The cursor is already closed.')
+        self._updatecount = -1
+        self._set_signature(signature)
+        frame = self._connection._client.fetch(self._connection._id, self._id, 0, self.itersize)
+        self._set_frame(frame)
+
     def _transform_row(self, row):
         """Transforms a Row into Python values.
 
diff --git a/python-phoenixdb/phoenixdb/meta.py b/python-phoenixdb/phoenixdb/meta.py
index 18ad147..d5987bb 100644
--- a/python-phoenixdb/phoenixdb/meta.py
+++ b/python-phoenixdb/phoenixdb/meta.py
@@ -16,6 +16,7 @@
 import sys
 import logging
 
+from phoenixdb.avatica.proto import common_pb2
 from phoenixdb.errors import ProgrammingError
 from phoenixdb.cursor import DictCursor
 
@@ -83,6 +84,119 @@
             cursor._process_result(result)
             return cursor.fetchall()
 
+    def get_primary_keys(self, catalog=None, schema=None, table=None):
+        if self._connection._closed:
+            raise ProgrammingError('The cursor is already closed.')
+
+        state = common_pb2.QueryState()
+        state.type = common_pb2.StateType.METADATA
+        state.op = common_pb2.MetaDataOperation.GET_PRIMARY_KEYS
+        state.has_args = True
+        state.has_op = True
+
+        catalog_arg = self._moa_string_arg_factory(catalog)
+        schema_arg = self._moa_string_arg_factory(schema)
+        table_arg = self._moa_string_arg_factory(table)
+        state.args.extend([catalog_arg, schema_arg, table_arg])
+
+        with DictCursor(self._connection) as cursor:
+            syncResultResponse = cursor.get_sync_results(state)
+            if not syncResultResponse.more_results:
+                return []
+
+            signature = common_pb2.Signature()
+            signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12))
+            signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12))
+            signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(4, 'COLUMN_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(5, 'KEY_SEQ', 5))
+            signature.columns.append(self._column_meta_data_factory(6, 'PK_NAME', 12))
+            # The following are non-standard Phoenix extensions
+            # This returns '\x00\x00\x00A' or '\x00\x00\x00D' , but that's consistent with Java
+            signature.columns.append(self._column_meta_data_factory(7, 'ASC_OR_DESC', 12))
+            signature.columns.append(self._column_meta_data_factory(8, 'DATA_TYPE', 5))
+            signature.columns.append(self._column_meta_data_factory(9, 'TYPE_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(10, 'COLUMN_SIZE', 5))
+            signature.columns.append(self._column_meta_data_factory(11, 'TYPE_ID', 5))
+            signature.columns.append(self._column_meta_data_factory(12, 'VIEW_CONSTANT', 12))
+
+            cursor.fetch(signature)
+            return cursor.fetchall()
+
+    def get_index_info(self, catalog=None, schema=None, table=None, unique=False, approximate=False):
+        if self._connection._closed:
+            raise ProgrammingError('The cursor is already closed.')
+
+        state = common_pb2.QueryState()
+        state.type = common_pb2.StateType.METADATA
+        state.op = common_pb2.MetaDataOperation.GET_INDEX_INFO
+        state.has_args = True
+        state.has_op = True
+
+        catalog_arg = self._moa_string_arg_factory(catalog)
+        schema_arg = self._moa_string_arg_factory(schema)
+        table_arg = self._moa_string_arg_factory(table)
+        unique_arg = self._moa_bool_arg_factory(unique)
+        approximate_arg = self._moa_bool_arg_factory(approximate)
+
+        state.args.extend([catalog_arg, schema_arg, table_arg, unique_arg, approximate_arg])
+
+        with DictCursor(self._connection) as cursor:
+            syncResultResponse = cursor.get_sync_results(state)
+            if not syncResultResponse.more_results:
+                return []
+
+            signature = common_pb2.Signature()
+            signature.columns.append(self._column_meta_data_factory(1, 'TABLE_CAT', 12))
+            signature.columns.append(self._column_meta_data_factory(2, 'TABLE_SCHEM', 12))
+            signature.columns.append(self._column_meta_data_factory(3, 'TABLE_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(4, 'NON_UNIQUE', 16))
+            signature.columns.append(self._column_meta_data_factory(5, 'INDEX_QUALIFIER', 12))
+            signature.columns.append(self._column_meta_data_factory(6, 'INDEX_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(7, 'TYPE', 5))
+            signature.columns.append(self._column_meta_data_factory(8, 'ORDINAL_POSITION', 5))
+            signature.columns.append(self._column_meta_data_factory(9, 'COLUMN_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(10, 'ASC_OR_DESC', 12))
+            signature.columns.append(self._column_meta_data_factory(11, 'CARDINALITY', 5))
+            signature.columns.append(self._column_meta_data_factory(12, 'PAGES', 5))
+            signature.columns.append(self._column_meta_data_factory(13, 'FILTER_CONDITION', 12))
+            # The following are non-standard Phoenix extensions
+            signature.columns.append(self._column_meta_data_factory(14, 'DATA_TYPE', 5))
+            signature.columns.append(self._column_meta_data_factory(15, 'TYPE_NAME', 12))
+            signature.columns.append(self._column_meta_data_factory(16, 'TYPE_ID', 5))
+            signature.columns.append(self._column_meta_data_factory(17, 'COLUMN_FAMILY', 12))
+            signature.columns.append(self._column_meta_data_factory(18, 'COLUMN_SIZE', 5))
+            signature.columns.append(self._column_meta_data_factory(19, 'ARRAY_SIZE', 5))
+
+            cursor.fetch(signature)
+            return cursor.fetchall()
+
+    def _column_meta_data_factory(self, ordinal, column_name, jdbc_code):
+        cmd = common_pb2.ColumnMetaData()
+        cmd.ordinal = ordinal
+        cmd.column_name = column_name
+        cmd.type.id = jdbc_code
+        cmd.nullable = 2
+        return cmd
+
+    def _moa_string_arg_factory(self, arg):
+        moa = common_pb2.MetaDataOperationArgument()
+        if arg is None:
+            moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL
+        else:
+            moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.STRING
+            moa.string_value = arg
+        return moa
+
+    def _moa_bool_arg_factory(self, arg):
+        moa = common_pb2.MetaDataOperationArgument()
+        if arg is None:
+            moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.NULL
+        else:
+            moa.type = common_pb2.MetaDataOperationArgument.ArgumentType.BOOL
+            moa.bool_value = arg
+        return moa
+
     def _fix_default(self, rows, catalog=None, schemaPattern=None):
         '''Workaround for PHOENIX-6003'''
         if schemaPattern == '':
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index b402322..8661417 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -166,19 +166,45 @@
     def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         if schema is None:
             schema = ''
-        columns = connection.connect().connection.meta().get_columns(
-            schemaPattern=schema, tableNamePattern=table_name, *kw)
-        pk_columns = [col['COLUMN_NAME'] for col in columns if col['KEY_SEQ'] > 0]
-        return {'constrained_columns': pk_columns}
+        raw = connection.connect().connection.meta().get_primary_keys(
+            schema=schema, table=table_name)
+        cooked = {
+            'constrained_columns': []
+        }
+        if raw:
+            cooked['name'] = raw[0]['PK_NAME']
+            for row in raw:
+                cooked['constrained_columns'].insert(row['KEY_SEQ'] - 1, row['COLUMN_NAME'])
+        return cooked
 
-    def get_indexes(self, conn, table_name, schema=None, **kw):
-        '''This information does not seem to be exposed via Avatica
-        TODO: Implement by directly querying SYSTEM tables ? '''
-        return []
+    def get_indexes(self, connection, table_name, schema=None, **kw):
+        if schema is None:
+            schema = ''
+        raw = connection.connect().connection.meta().get_index_info(schema=schema, table=table_name)
+        # We know that Phoenix returns the rows ordered by INDEX_NAME and ORDINAL_POSITION
+        cooked = []
+        current = None
+        for row in raw:
+            if current is None or row['INDEX_NAME'] != current['name']:
+                current = {
+                    'name': row['INDEX_NAME'],
+                    'unique': not row['NON_UNIQUE'] is False,
+                    'column_names': [],
+                }
+                cooked.append(current)
+            # Phoenix returns the column names in its internal representation here
+            # Remove the default CF prefix
+            canonical_name = row['INDEX_NAME']
+            if canonical_name.startswith('0:'):
+                canonical_name = canonical_name[len(':0')]
+            if canonical_name.startswith(':'):
+                canonical_name = canonical_name[len(':')]
+            current['column_names'].append(canonical_name)
+        return cooked
 
     def get_foreign_keys(self, conn, table_name, schema=None, **kw):
         '''Foreign keys are a foreign concept to Phoenix,
-        but SqlAlchemy cannot parse the DB schema if it's not implemented '''
+        and SqlAlchemy cannot parse the DB schema if it's not implemented '''
         return []
 
     def _map_column(self, raw):
diff --git a/python-phoenixdb/phoenixdb/tests/test_db.py b/python-phoenixdb/phoenixdb/tests/test_db.py
index da12b23..0c04b11 100644
--- a/python-phoenixdb/phoenixdb/tests/test_db.py
+++ b/python-phoenixdb/phoenixdb/tests/test_db.py
@@ -17,7 +17,7 @@
 
 import phoenixdb.cursor
 from phoenixdb.connection import Connection
-from phoenixdb.errors import InternalError
+from phoenixdb.errors import InternalError, ProgrammingError
 from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL
 
 
@@ -172,6 +172,65 @@
                     {'TABLE_TYPE': 'VIEW'}]))
 
                 self.assertEqual(meta.get_type_info(), [])
+
+            finally:
+                cursor.execute('drop table if exists DEFAULT_TABLE')
+                cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+                cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
+
+    def test_meta2(self):
+        with self.conn.cursor() as cursor:
+            try:
+                cursor.execute('drop table if exists DEFAULT_TABLE')
+                cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+                cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
+
+                cursor.execute('''create table DEFAULT_TABLE (ID integer not null, ID2 varchar not null,
+                V1 integer, V2 varchar, constraint PK PRIMARY KEY (ID DESC, ID2 ASC))''')
+                cursor.execute('CREATE INDEX GLOBAL_IDX ON DEFAULT_TABLE (V1) INCLUDE (V2)')
+                cursor.execute('CREATE LOCAL INDEX LOCAL_IDX ON DEFAULT_TABLE (V1)')
+                cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
+                cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
+
+                meta = self.conn.meta()
+                self.assertTrue(len(meta.get_primary_keys(table='DEFAULT_TABLE')),
+                                [{'ASC_OR_DESC': '\x00\x00\x00D',
+                                  'COLUMN_NAME': 'ID',
+                                  'COLUMN_SIZE': None,
+                                  'DATA_TYPE': 4,
+                                  'KEY_SEQ': 1,
+                                  'PK_NAME': 'PK',
+                                  'TABLE_CAT': None,
+                                  'TABLE_NAME': 'DEFAULT_TABLE',
+                                  'TABLE_SCHEM': None,
+                                  'TYPE_ID': 4,
+                                  'TYPE_NAME': 'INTEGER',
+                                  'VIEW_CONSTANT': None},
+                                 {'ASC_OR_DESC': '\x00\x00\x00A',
+                                  'COLUMN_NAME': 'ID2',
+                                  'COLUMN_SIZE': None,
+                                  'DATA_TYPE': 12,
+                                  'KEY_SEQ': 2,
+                                  'PK_NAME': 'PK',
+                                  'TABLE_CAT': None,
+                                  'TABLE_NAME': 'DEFAULT_TABLE',
+                                  'TABLE_SCHEM': None,
+                                  'TYPE_ID': 12,
+                                  'TYPE_NAME': 'VARCHAR',
+                                  'VIEW_CONSTANT': None}])
+                self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='A_TABLE')), 1)
+                try:
+                    self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='B_TABLE')), 0)
+                    self.assertTrue(False)
+                except ProgrammingError:
+                    pass
+
+                self.maxDiff = None
+
+                self.assertEqual(meta.get_index_info(table='NON_EXISTENT'), [])
+
+                self.assertTrue(len(meta.get_index_info(table='DEFAULT_TABLE')) > 1)
+
             finally:
                 cursor.execute('drop table if exists DEFAULT_TABLE')
                 cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
diff --git a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
index 37ed5a0..c996262 100644
--- a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
+++ b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
@@ -18,6 +18,7 @@
 
 import sqlalchemy as db
 from sqlalchemy import text
+from sqlalchemy.types import BIGINT, CHAR, VARCHAR
 
 from . import TEST_DB_AUTHENTICATION, TEST_DB_AVATICA_PASSWORD, TEST_DB_AVATICA_USER, \
     TEST_DB_TRUSTSTORE, TEST_DB_URL
@@ -103,8 +104,27 @@
                 city VARCHAR NOT NULL,
                 population BIGINT
                 CONSTRAINT my_pk PRIMARY KEY (state, city))'''))
+                connection.execute('CREATE INDEX GLOBAL_IDX ON US_POPULATION (state) INCLUDE (city)')
+                connection.execute('CREATE LOCAL INDEX LOCAL_IDX ON US_POPULATION (population)')
+
                 columns_result = inspector.get_columns('US_POPULATION')
-                self.assertEqual(len(columns_result), 3)
+                # The list is not equal to its represenatation
+                self.assertTrue(str(columns_result),
+                                str([{'name': 'STATE', 'type': CHAR(), 'nullable': True,
+                                      'autoincrement': False, 'comment': '', 'default': None},
+                                    {'name': 'CITY', 'type': VARCHAR(), 'nullable': True,
+                                    'autoincrement': False, 'comment': '', 'default': None},
+                                     {'name': 'POPULATION', 'type': BIGINT(), 'nullable': True,
+                                     'autoincrement': False, 'comment': '', 'default': None}]))
+
+                indexes_result = inspector.get_indexes('US_POPULATION')
+                self.assertTrue(indexes_result,
+                                [{'name': 'GLOBAL_IDX', 'unique': False, 'column_names': ['STATE', 'CITY']},
+                                 {'name': 'LOCAL_IDX', 'unique': False, 'column_names': ['_INDEX_ID', 'POPULATION', 'STATE', 'CITY']}])
+
+                pk_result = inspector.get_pk_constraint('US_POPULATION')
+                self.assertTrue(pk_result, {'constrained_columns': ['STATE', 'CITY'], 'name': 'MY_PK'})
+
             finally:
                 connection.execute('drop table if exists us_population')