blob: a1d6e2caabac9a398180f1e3c7e8ef4ce2b99ba3 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import warnings
from iotdb.Session import Session
from .Exceptions import ProgrammingError
logger = logging.getLogger("IoTDB")
class Cursor(object):
def __init__(self, connection, session: Session, sqlalchemy_mode):
self.__connection = connection
self.__session = session
self.__sqlalchemy_mode = sqlalchemy_mode
self.__arraysize = 1
self.__is_close = False
self.__result = None
self.__rows = None
self.__rowcount = -1
@property
def description(self):
"""
This read-only attribute is a sequence of 7-item sequences.
"""
if self.__is_close or not self.__result["col_names"]:
return
description = []
col_names = self.__result["col_names"]
col_types = self.__result["col_types"]
for i in range(len(col_names)):
description.append(
(
col_names[i],
None if self.__sqlalchemy_mode is True else col_types[i].value,
None,
None,
None,
None,
col_names[i] == "Time",
)
)
return tuple(description)
@property
def arraysize(self):
"""
This read/write attribute specifies the number of rows to fetch at a time with .fetchmany().
"""
return self.__arraysize
@arraysize.setter
def arraysize(self, value):
"""
Set the arraysize.
:param value: arraysize
"""
try:
self.__arraysize = int(value)
except TypeError:
self.__arraysize = 1
@property
def rowcount(self):
"""
This read-only attribute specifies the number of rows that the last
.execute*() produced (for DQL statements like ``SELECT``) or affected
(for DML statements like ``DELETE`` or ``INSERT`` return 0 if successful, -1 if unsuccessful).
"""
if self.__is_close or self.__result is None or "row_count" not in self.__result:
return -1
return self.__result.get("row_count", -1)
def execute(self, operation, parameters=None):
"""
Prepare and execute a database operation (query or command).
:param operation: a database operation
:param parameters: parameters of the operation
"""
if self.__connection.is_close:
raise ProgrammingError("Connection closed!")
if self.__is_close:
raise ProgrammingError("Cursor closed!")
if parameters is None:
sql = operation
else:
sql = operation % parameters
time_index = []
time_names = []
if self.__sqlalchemy_mode:
sql_seqs = []
seqs = sql.split("\n")
for seq in seqs:
if seq.find("FROM Time Index") >= 0:
time_index = [
int(index)
for index in seq.replace("FROM Time Index", "").split()
]
elif seq.find("FROM Time Name") >= 0:
time_names = [
name for name in seq.replace("FROM Time Name", "").split()
]
else:
sql_seqs.append(seq)
sql = "\n".join(sql_seqs)
try:
data_set = self.__session.execute_statement(sql)
col_names = None
col_types = None
rows = []
if data_set:
data = data_set.todf()
if self.__sqlalchemy_mode and time_index:
time_column = data.columns[0]
time_column_value = data.Time
del data[time_column]
for i in range(len(time_index)):
data.insert(time_index[i], time_names[i], time_column_value)
col_names = data.columns.tolist()
col_types = data_set.get_column_types()
rows = data.values.tolist()
data_set.close_operation_handle()
self.__result = {
"col_names": col_names,
"col_types": col_types,
"rows": rows,
"row_count": len(rows),
}
except Exception:
logger.error("failed to execute statement:{}".format(sql))
self.__result = {
"col_names": None,
"col_types": None,
"rows": [],
"row_count": -1,
}
self.__rows = iter(self.__result["rows"])
def executemany(self, operation, seq_of_parameters=None):
"""
Prepare a database operation (query or command) and then execute it
against all parameter sequences or mappings found in the sequence
``seq_of_parameters``
:param operation: a database operation
:param seq_of_parameters: pyformat style parameter list of the operation
"""
if self.__connection.is_close:
raise ProgrammingError("Connection closed!")
if self.__is_close:
raise ProgrammingError("Cursor closed!")
rows = []
if seq_of_parameters is None:
self.execute(operation)
rows.extend(self.__result["rows"])
else:
for parameters in seq_of_parameters:
self.execute(operation, parameters)
rows.extend(self.__result["rows"])
self.__result["rows"] = rows
self.__rows = iter(self.__result["rows"])
def fetchone(self):
"""
Fetch the next row of a query result set, returning a single sequence,
or None when no more data is available.
Alias for ``next()``.
"""
try:
return self.next()
except StopIteration:
return None
def fetchmany(self, count=None):
"""
Fetch the next set of rows of a query result, returning a sequence of
sequences (e.g. a list of tuples). An empty sequence is returned when
no more rows are available.
"""
if count is None:
count = self.__arraysize
if count == 0:
return self.fetchall()
result = []
for i in range(count):
try:
result.append(self.next())
except StopIteration:
pass
return result
def fetchall(self):
"""
Fetch all (remaining) rows of a query result, returning them as a
sequence of sequences (e.g. a list of tuples). Note that the cursor's
arraysize attribute can affect the performance of this operation.
"""
result = []
iterate = True
while iterate:
try:
result.append(self.next())
except StopIteration:
iterate = False
return result
def next(self):
"""
Return the next row of a query result set, respecting if cursor was
closed.
"""
if self.__result is None:
raise ProgrammingError(
"No result available. execute() or executemany() must be called first."
)
elif not self.__is_close:
return next(self.__rows)
else:
raise ProgrammingError("Cursor closed!")
__next__ = next
def close(self):
"""
Close the cursor now.
"""
self.__is_close = True
self.__result = None
def setinputsizes(self, sizes):
"""
Not supported method.
"""
pass
def setoutputsize(self, size, column=None):
"""
Not supported method.
"""
pass
def __iter__(self):
"""
Support iterator interface:
http://legacy.python.org/dev/peps/pep-0249/#iter
This iterator is shared. Advancing this iterator will advance other
iterators created from this cursor.
"""
warnings.warn("DB-API extension cursor.__iter__() used")
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()