blob: c996262918593d849da32fdc439ccae4ef6c350d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import sqlalchemy as db
from sqlalchemy import text
from sqlalchemy.types import BIGINT, CHAR, VARCHAR
from . import TEST_DB_AUTHENTICATION, TEST_DB_AVATICA_PASSWORD, TEST_DB_AVATICA_USER, \
TEST_DB_TRUSTSTORE, TEST_DB_URL
if sys.version_info.major == 3:
from urllib.parse import urlparse, urlunparse
else:
from urlparse import urlparse, urlunparse
@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
class SQLAlchemyTest(unittest.TestCase):
def test_connection(self):
engine = self._create_engine()
# connection = engine.connect()
metadata = db.MetaData()
catalog = db.Table('CATALOG', metadata, schema='SYSTEM', autoload=True, autoload_with=engine)
self.assertIn('TABLE_NAME', catalog.columns.keys())
def test_textual(self):
engine = self._create_engine()
with engine.connect() as connection:
try:
connection.execute('drop table if exists ALCHEMY_TEST')
connection.execute(text('create table ALCHEMY_TEST (id integer primary key)'))
connection.execute(text('upsert into ALCHEMY_TEST values (42)'))
# SQLAlchemy autocommit should kick in
result = connection.execute(text('select * from ALCHEMY_TEST'))
row = result.fetchone()
self.assertEqual(row[0], 42)
finally:
connection.execute('drop table if exists ALCHEMY_TEST')
def test_schema_filtering(self):
engine = self._create_engine()
with engine.connect() as connection:
try:
inspector = db.inspect(engine)
connection.execute('drop table if exists ALCHEMY_TEST')
connection.execute('drop table if exists A.ALCHEMY_TEST_A')
connection.execute('drop table if exists B.ALCHEMY_TEST_B')
self.assertEqual(inspector.get_schema_names(), ['', 'SYSTEM'])
connection.execute(text('create table ALCHEMY_TEST (ID integer primary key)'))
connection.execute(text('create table A.ALCHEMY_TEST_A (ID_A integer primary key)'))
connection.execute(text('create table B.ALCHEMY_TEST_B (ID_B integer primary key)'))
self.assertEqual(inspector.get_schema_names(), ['', 'A', 'B', 'SYSTEM'])
self.assertEqual(inspector.get_table_names(), ['ALCHEMY_TEST'])
self.assertEqual(inspector.get_table_names(''), ['ALCHEMY_TEST'])
self.assertEqual(inspector.get_table_names('A'), ['ALCHEMY_TEST_A'])
self.assertEqual(inspector.get_table_names('B'), ['ALCHEMY_TEST_B'])
self.assertEqual(inspector.get_columns('ALCHEMY_TEST').pop()['name'], 'ID')
self.assertEqual(
inspector.get_columns('ALCHEMY_TEST', '').pop()['name'], 'ID')
self.assertEqual(
inspector.get_columns('ALCHEMY_TEST_A', 'A').pop()['name'], 'ID_A')
self.assertTrue(engine.has_table('ALCHEMY_TEST'))
self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
self.assertTrue(engine.has_table('ALCHEMY_TEST_A', 'A'))
self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
finally:
connection.execute('drop table if exists ALCHEMY_TEST')
connection.execute('drop table if exists A.ALCHEMY_TEST_A')
connection.execute('drop table if exists B.ALCHEMY_TEST_B')
def test_reflection(self):
engine = self._create_engine()
with engine.connect() as connection:
try:
inspector = db.inspect(engine)
columns_result = inspector.get_columns('DOES_NOT_EXIST')
self.assertEqual([], columns_result)
connection.execute('drop table if exists us_population')
connection.execute(text('''create table if not exists US_POPULATION (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city))'''))
connection.execute('CREATE INDEX GLOBAL_IDX ON US_POPULATION (state) INCLUDE (city)')
connection.execute('CREATE LOCAL INDEX LOCAL_IDX ON US_POPULATION (population)')
columns_result = inspector.get_columns('US_POPULATION')
# The list is not equal to its represenatation
self.assertTrue(str(columns_result),
str([{'name': 'STATE', 'type': CHAR(), 'nullable': True,
'autoincrement': False, 'comment': '', 'default': None},
{'name': 'CITY', 'type': VARCHAR(), 'nullable': True,
'autoincrement': False, 'comment': '', 'default': None},
{'name': 'POPULATION', 'type': BIGINT(), 'nullable': True,
'autoincrement': False, 'comment': '', 'default': None}]))
indexes_result = inspector.get_indexes('US_POPULATION')
self.assertTrue(indexes_result,
[{'name': 'GLOBAL_IDX', 'unique': False, 'column_names': ['STATE', 'CITY']},
{'name': 'LOCAL_IDX', 'unique': False, 'column_names': ['_INDEX_ID', 'POPULATION', 'STATE', 'CITY']}])
pk_result = inspector.get_pk_constraint('US_POPULATION')
self.assertTrue(pk_result, {'constrained_columns': ['STATE', 'CITY'], 'name': 'MY_PK'})
finally:
connection.execute('drop table if exists us_population')
@unittest.skip("ORM feature not implemented")
def test_orm(self):
pass
def _create_engine(self):
''''Massage the properties that we use for the DBAPI tests so that they apply to
SQLAlchemy'''
url_parts = urlparse(TEST_DB_URL)
tls = url_parts.scheme.lower == 'https'
url_parts = url_parts._replace(scheme='phoenix')
connect_args = dict()
if TEST_DB_AUTHENTICATION:
connect_args.update(authentication=TEST_DB_AUTHENTICATION)
if TEST_DB_AVATICA_USER:
connect_args.update(avatica_user=TEST_DB_AVATICA_USER)
if TEST_DB_AVATICA_PASSWORD:
connect_args.update(avatica_password=TEST_DB_AVATICA_PASSWORD)
if TEST_DB_TRUSTSTORE:
connect_args.update(trustore=TEST_DB_TRUSTSTORE)
return db.create_engine(urlunparse(url_parts), tls=tls, connect_args=connect_args)