blob: 0c04b116943d0a4d7b6ea6881cc9e8707ea75d72 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import phoenixdb.cursor
from phoenixdb.connection import Connection
from phoenixdb.errors import InternalError, ProgrammingError
from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL
@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
class PhoenixDatabaseTest(DatabaseTestCase):
def test_select_literal(self):
with self.conn.cursor() as cursor:
self.createTable("test", "CREATE TABLE {table} (id INTEGER PRIMARY KEY, text VARCHAR)")
cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)])
with self.conn.cursor() as cursor:
cursor.itersize = 4
cursor.execute("SELECT * FROM test WHERE id>1 ORDER BY id")
self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)])
def test_select_parameter(self):
with self.conn.cursor() as cursor:
self.createTable("test", "CREATE TABLE {table} (id INTEGER PRIMARY KEY, text VARCHAR)")
cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)])
with self.conn.cursor() as cursor:
cursor.itersize = 4
cursor.execute("SELECT * FROM test WHERE id>? ORDER BY id", [1])
self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)])
def _check_dict_cursor(self, cursor):
self.createTable("test", "CREATE TABLE {table} (id INTEGER PRIMARY KEY, text VARCHAR)")
cursor.execute("UPSERT INTO test VALUES (?, ?)", [1, 'text 1'])
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [{'ID': 1, 'TEXT': 'text 1'}])
def test_dict_cursor_default_parameter(self):
self.reopen(autocommit=True, cursor_factory=phoenixdb.cursor.DictCursor)
with self.conn.cursor() as cursor:
self._check_dict_cursor(cursor)
def test_dict_cursor_default_attribute(self):
self.conn.cursor_factory = phoenixdb.cursor.DictCursor
with self.conn.cursor() as cursor:
self._check_dict_cursor(cursor)
def test_dict_cursor(self):
self.reopen(autocommit=True, cursor_factory=phoenixdb.cursor.DictCursor)
with self.conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor) as cursor:
self._check_dict_cursor(cursor)
def test_schema(self):
with self.conn.cursor() as cursor:
try:
cursor.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
except InternalError as e:
if "phoenix.schema.isNamespaceMappingEnabled" in e.message:
self.skipTest(e.message)
raise
self.createTable("test_schema.test", "CREATE TABLE {table} (id INTEGER PRIMARY KEY, text VARCHAR)")
cursor.execute("UPSERT INTO test_schema.test VALUES (?, ?)", [1, 'text 1'])
cursor.execute("SELECT * FROM test_schema.test ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'text 1']])
def test_transaction(self):
self.reopen(autocommit=False)
with self.conn.cursor() as cursor:
self.createTable("test", "CREATE TABLE {table} (id INTEGER PRIMARY KEY, text VARCHAR)")
cursor.execute("UPSERT INTO test VALUES (?, ?)", [1, 'one'])
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [])
self.conn.commit()
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'one']])
self.assertEqual(self.conn.autocommit, False)
cursor.execute("UPSERT INTO test VALUES (?, ?)", [2, 'two'])
self.conn.rollback()
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'one']])
self.assertEqual(self.conn.autocommit, False)
cursor.execute("UPSERT INTO test VALUES (?, ?)", [2, 'two'])
# Since we expose the JDBC semantics, this is an implicit commit
self.conn.autocommit = True
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'one'], [2, 'two']])
def test_conn_props(self):
phoenix_args, avatica_args = Connection._map_conn_props(
{'autoCommit': True,
'readonly': True,
'transactionIsolation': 3,
'schema': 'bubu',
'phoenixArg': 'phoenixArg'})
self.assertEqual(phoenix_args, {'phoenixArg': 'phoenixArg'})
self.assertEqual(avatica_args, {'autoCommit': True,
'readOnly': True,
'transactionIsolation': 3,
'schema': 'bubu'})
def test_meta(self):
with self.conn.cursor() as cursor:
try:
cursor.execute('drop table if exists DEFAULT_TABLE')
cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
cursor.execute('create table DEFAULT_TABLE (ID integer primary key)')
cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
meta = self.conn.meta()
self.assertEqual(meta.get_catalogs(), [])
self.assertEqual(meta.get_schemas(), [
{'TABLE_SCHEM': '', 'TABLE_CATALOG': ''},
{'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''},
{'TABLE_SCHEM': 'B_SCHEMA', 'TABLE_CATALOG': ''},
{'TABLE_SCHEM': 'SYSTEM', 'TABLE_CATALOG': ''}])
self.assertEqual(meta.get_schemas(schemaPattern=''), [
{'TABLE_SCHEM': '', 'TABLE_CATALOG': ''}])
self.assertEqual(meta.get_schemas(schemaPattern='A_SCHEMA'), [
{'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''}])
a_tables = meta.get_tables()
self.assertTrue(len(a_tables) > 3) # Don't know how many tables SYSTEM has
a_tables = meta.get_tables(schemaPattern='')
self.assertEqual(len(a_tables), 1)
self.assertTrue(a_tables[0]['TABLE_NAME'] == 'DEFAULT_TABLE')
a_tables = meta.get_tables(schemaPattern='A_SCHEMA')
self.assertEqual(len(a_tables), 1)
self.assertTrue(a_tables[0]['TABLE_NAME'] == 'A_TABLE')
a_columns = meta.get_columns(schemaPattern='A_SCHEMA', tableNamePattern='A_TABLE')
self.assertEqual(len(a_columns), 1)
self.assertTrue(a_columns[0]['COLUMN_NAME'] == 'ID_A')
self.assertTrue(all(elem in meta.get_table_types() for elem in [
{'TABLE_TYPE': 'INDEX'},
{'TABLE_TYPE': 'SEQUENCE'},
{'TABLE_TYPE': 'SYSTEM TABLE'},
{'TABLE_TYPE': 'TABLE'},
{'TABLE_TYPE': 'VIEW'}]))
self.assertEqual(meta.get_type_info(), [])
finally:
cursor.execute('drop table if exists DEFAULT_TABLE')
cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
def test_meta2(self):
with self.conn.cursor() as cursor:
try:
cursor.execute('drop table if exists DEFAULT_TABLE')
cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
cursor.execute('''create table DEFAULT_TABLE (ID integer not null, ID2 varchar not null,
V1 integer, V2 varchar, constraint PK PRIMARY KEY (ID DESC, ID2 ASC))''')
cursor.execute('CREATE INDEX GLOBAL_IDX ON DEFAULT_TABLE (V1) INCLUDE (V2)')
cursor.execute('CREATE LOCAL INDEX LOCAL_IDX ON DEFAULT_TABLE (V1)')
cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
meta = self.conn.meta()
self.assertTrue(len(meta.get_primary_keys(table='DEFAULT_TABLE')),
[{'ASC_OR_DESC': '\x00\x00\x00D',
'COLUMN_NAME': 'ID',
'COLUMN_SIZE': None,
'DATA_TYPE': 4,
'KEY_SEQ': 1,
'PK_NAME': 'PK',
'TABLE_CAT': None,
'TABLE_NAME': 'DEFAULT_TABLE',
'TABLE_SCHEM': None,
'TYPE_ID': 4,
'TYPE_NAME': 'INTEGER',
'VIEW_CONSTANT': None},
{'ASC_OR_DESC': '\x00\x00\x00A',
'COLUMN_NAME': 'ID2',
'COLUMN_SIZE': None,
'DATA_TYPE': 12,
'KEY_SEQ': 2,
'PK_NAME': 'PK',
'TABLE_CAT': None,
'TABLE_NAME': 'DEFAULT_TABLE',
'TABLE_SCHEM': None,
'TYPE_ID': 12,
'TYPE_NAME': 'VARCHAR',
'VIEW_CONSTANT': None}])
self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='A_TABLE')), 1)
try:
self.assertEqual(len(meta.get_primary_keys(schema='A_SCHEMA', table='B_TABLE')), 0)
self.assertTrue(False)
except ProgrammingError:
pass
self.maxDiff = None
self.assertEqual(meta.get_index_info(table='NON_EXISTENT'), [])
self.assertTrue(len(meta.get_index_info(table='DEFAULT_TABLE')) > 1)
finally:
cursor.execute('drop table if exists DEFAULT_TABLE')
cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
@unittest.skip("https://issues.apache.org/jira/browse/PHOENIX-6004")
def test_case_sensitivity(self):
with self.conn.cursor() as cursor:
try:
cursor.execute('drop table if exists AAA')
cursor.execute('drop table if exists "aaa"')
cursor.execute('drop table if exists "Aaa"')
cursor.execute('create table AAA (ID integer primary key, YYY integer)')
cursor.execute('create table "aaa" ("ID_x" integer primary key, YYY integer, "Yyy" integer, "yyy" integer)')
cursor.execute('create table "Aaa" (ID_X integer primary key, ZZZ integer, "Zzz" integer, "zzz" integer)')
cursor.execute('upsert into AAA values (1, 2)')
cursor.execute('upsert into "aaa" values (11, 12, 13, 14)')
cursor.execute('upsert into "Aaa" values (21, 22, 23, 24)')
cursor.execute('select YYY from AAA')
self.assertEqual(cursor.fetchone(), [2])
cursor.execute('select YYY from "aaa"')
self.assertEqual(cursor.fetchone(), [12])
cursor.execute('select "YYY" from "aaa"')
self.assertEqual(cursor.fetchone(), [12])
cursor.execute('select "Yyy" from "aaa"')
self.assertEqual(cursor.fetchone(), [13])
meta = self.conn.meta()
self.assertEquals(len(meta.get_tables(schemaPattern='')), 3)
print(meta.get_columns(schemaPattern='',
tableNamePattern='"aaa"'))
self.assertEquals(len(meta.get_tables(schemaPattern='',
tableNamePattern='AAA')), 1)
self.assertEquals(len(meta.get_tables(schemaPattern='',
tableNamePattern='"aaa"')), 1)
self.assertEquals(meta.get_columns(tableNamePattern='AAA',
columnNamePattern='YYY'), 1)
self.assertEquals(meta.get_columns(tableNamePattern='AAA',
columnNamePattern='yyy'), 1)
self.assertEquals(meta.get_columns(tableNamePattern='AAA',
columnNamePattern='"yyy"'), 0)
finally:
cursor.execute('drop table if exists AAA')
cursor.execute('drop table if exists "aaa"')
cursor.execute('drop table if exists "Aaa"')