blob: 43351be251690a1a1e9dc0ff839fd295d8e3b721 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from threading import Thread
from iotdb.IoTDBContainer import IoTDBContainer
from iotdb.SessionPool import create_session_pool, PoolConfig
CONTAINER_NAME = "iotdb:dev"
def test_session_pool():
with IoTDBContainer(CONTAINER_NAME) as db:
db: IoTDBContainer
max_pool_size = 2
pool_config = PoolConfig(
db.get_container_host_ip(),
db.get_exposed_port(6667),
"root",
"root",
[],
1024,
"Asia/Shanghai",
3,
)
session_pool = create_session_pool(pool_config, max_pool_size, 3000)
session = session_pool.get_session()
assert session.is_open() is True
session2 = session_pool.get_session()
timeout = False
try:
session_pool.get_session()
except TimeoutError as e:
timeout = True
assert str(
e
) == "Wait to get session timeout in SessionPool, current pool size: " + str(
max_pool_size
)
assert timeout is True
Thread(target=lambda: session_pool.put_back(session2)).start()
session3 = session_pool.get_session()
session_pool.close()
is_closed = False
try:
session_pool.get_session()
except ConnectionError as e:
is_closed = True
assert str(e) == "SessionPool has already been closed."
assert is_closed is True
is_closed = False
try:
session_pool.put_back(session3)
except ConnectionError as e:
is_closed = True
assert (
str(e)
== "SessionPool has already been closed, please close the session manually."
)
assert is_closed is True
def test_session_pool_by_node_urls():
with IoTDBContainer(CONTAINER_NAME) as db:
db: IoTDBContainer
node_url = db.get_container_host_ip() + ":" + str(db.get_exposed_port(6667))
max_pool_size = 1
pool_config = PoolConfig(
node_urls=[node_url], user_name="root", password="root"
)
session_pool = create_session_pool(pool_config, max_pool_size, 3000)
session = session_pool.get_session()
timeout = False
try:
session_pool.get_session()
except TimeoutError as e:
timeout = True
assert str(
e
) == "Wait to get session timeout in SessionPool, current pool size: " + str(
max_pool_size
)
assert timeout is True
session_pool.put_back(session)
session2 = session_pool.get_session()
assert session2.is_open() is True
session_pool.put_back(session2)
session_pool.close()
assert session2.is_open() is False
check = False
try:
session_pool.get_session()
except ConnectionError as e:
check = True
assert str(e) == "SessionPool has already been closed."
assert check is True