blob: 156844ec6f274d67af6a044f508f19e60c47efeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using Apache.IoTDB.DataStructure;
namespace Apache.IoTDB.Data
{
/// <summary>
/// Provides methods for reading the result of a command executed against a IoTDB database.
/// </summary>
public class IoTDBDataReader : DbDataReader
{
private readonly SessionPool _IoTDB;
private readonly IoTDBCommand _command;
private bool _hasRows;
private readonly int _recordsAffected;
private bool _closed;
private readonly List<string> _metas;
private bool _closeConnection;
private int _fieldCount;
RowRecord rowdata = null;
SessionDataSet _dataSet;
internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool closeConnection)
{
_IoTDB = IoTDBCommand.Connection._IoTDB;
_command = IoTDBCommand;
_closeConnection = closeConnection;
_fieldCount = dataSet.ColumnNames.Count;
_hasRows = dataSet.RowCount > 0;
_recordsAffected = dataSet.RowCount;
_closed = _closeConnection;
_metas = dataSet.ColumnNames;
_dataSet = dataSet;
}
/// <summary>
/// Gets the depth of nesting for the current row. Always zero.
/// </summary>
/// <value>The depth of nesting for the current row.</value>
public override int Depth => 0;
/// <summary>
/// Gets the number of columns in the current row.
/// </summary>
/// <value>The number of columns in the current row.</value>
public override int FieldCount => _fieldCount + 1;
/// <summary>
/// Gets a value indicating whether the data reader contains any rows.
/// </summary>
/// <value>A value indicating whether the data reader contains any rows.</value>
public override bool HasRows
=> _hasRows;
/// <summary>
/// Gets a value indicating whether the data reader is closed.
/// </summary>
/// <value>A value indicating whether the data reader is closed.</value>
public override bool IsClosed
=> _closed;
/// <summary>
/// Gets the number of rows inserted, updated, or deleted. -1 for SELECT statements.
/// </summary>
/// <value>The number of rows inserted, updated, or deleted.</value>
public override int RecordsAffected
{
get
{
return _recordsAffected;
}
}
/// <summary>
/// Gets the value of the specified column.
/// </summary>
/// <param name="name">The name of the column. The value is case-sensitive.</param>
/// <returns>The value.</returns>
public override object this[string name]
=> this[GetOrdinal(name)];
/// <summary>
/// Gets the value of the specified column.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value.</returns>
public override object this[int ordinal]
=> GetValue(ordinal);
/// <summary>
/// Gets an enumerator that can be used to iterate through the rows in the data reader.
/// </summary>
/// <returns>The enumerator.</returns>
public override IEnumerator GetEnumerator()
=> new DbEnumerator(this, closeReader: false);
/// <summary>
/// Advances to the next row in the result set.
/// </summary>
/// <returns>true if there are more rows; otherwise, false.</returns>
public override bool Read()
{
if (_closed)
{
throw new InvalidOperationException($"DataReaderClosed{nameof(Read)}");
}
if (_dataSet.HasNext())
{
rowdata = _dataSet.Next();
}
else
{
rowdata = null;
}
return rowdata != null;
}
/// <summary>
/// Advances to the next result set for batched statements.
/// </summary>
/// <returns>true if there are more result sets; otherwise, false.</returns>
public override bool NextResult()
{
return Read();
}
/// <summary>
/// Closes the data reader.
/// </summary>
public override void Close()
=> Dispose(true);
/// <summary>
/// Releases any resources used by the data reader and closes it.
/// </summary>
/// <param name="disposing">
/// true to release managed and unmanaged resources; false to release only unmanaged resources.
/// </param>
protected override void Dispose(bool disposing)
{
if (!disposing)
{
return;
}
_dataSet.Close().GetAwaiter().GetResult();
_command.DataReader = null;
if (_closeConnection)
{
_command.Connection.Close();
_closed = true;
}
rowdata = null;
}
/// <summary>
/// Gets the name of the specified column.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The name of the column.</returns>
public override string GetName(int ordinal)
{
return ordinal == 0 ? "timestamp" : rowdata.Measurements[ordinal - 1];
}
/// <summary>
/// Gets the ordinal of the specified column.
/// </summary>
/// <param name="name">The name of the column.</param>
/// <returns>The zero-based column ordinal.</returns>
public override int GetOrdinal(string name)
=> "timestamp" == name ? 0 : rowdata.Measurements.IndexOf(name) + 1;
public override string GetDataTypeName(int ordinal)
{
return GetFieldType(ordinal).Name;
}
/// <summary>
/// Gets the data type of the specified column.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The data type of the column.</returns>
public override Type GetFieldType(int ordinal)
{
return ordinal == 0 ? typeof(DateTime) : rowdata.GetCrlType(ordinal - 1);
}
/// <summary>
/// Gets a value indicating whether the specified column is <see cref="DBNull" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>true if the specified column is <see cref="DBNull" />; otherwise, false.</returns>
public override bool IsDBNull(int ordinal)
=> GetValue(ordinal) == DBNull.Value;
/// <summary>
/// Gets the value of the specified column as a <see cref="bool" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override bool GetBoolean(int ordinal) => GetFieldValue<bool>(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="byte" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override byte GetByte(int ordinal) => GetFieldValue<byte>(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="char" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override char GetChar(int ordinal) => GetFieldValue<char>(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="DateTime" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override DateTime GetDateTime(int ordinal)
{
return GetFieldValue<DateTime>(ordinal);
}
/// <summary>
/// Gets the value of the specified column as a <see cref="DateTimeOffset" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public virtual DateTimeOffset GetDateTimeOffset(int ordinal)
{
return GetDateTime(ordinal);
}
/// <summary>
/// Gets the value of the specified column as a <see cref="TimeSpan" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public virtual TimeSpan GetTimeSpan(int ordinal)
{
var val = GetInt64(ordinal);
return TimeSpan.FromMilliseconds(val);
}
/// <summary>
/// Gets the value of the specified column as a <see cref="decimal" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="double" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override double GetDouble(int ordinal) => (double)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="float" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override float GetFloat(int ordinal) => (float)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="Guid" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override Guid GetGuid(int ordinal) => GetFieldValue<Guid>(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="short" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override short GetInt16(int ordinal) => (short)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="ushort" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public ushort GetUInt16(int ordinal) => (ushort)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="int" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override int GetInt32(int ordinal) => (int)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="uint" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public uint GetUInt32(int ordinal) => (uint)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="long" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override long GetInt64(int ordinal) => (long)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="ulong" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public ulong GetUInt64(int ordinal) => (ulong)GetValue(ordinal);
/// <summary>
/// Gets the value of the specified column as a <see cref="string" />.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override string GetString(int ordinal) => (string)GetValue(ordinal);
/// <summary>
/// Reads a stream of bytes from the specified column. Not supported.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <param name="dataOffset">The index from which to begin the read operation.</param>
/// <param name="buffer">The buffer into which the data is copied.</param>
/// <param name="bufferOffset">The index to which the data will be copied.</param>
/// <param name="length">The maximum number of bytes to read.</param>
/// <returns>The actual number of bytes read.</returns>
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
{
throw new NotSupportedException();
}
/// <summary>
/// Reads a stream of characters from the specified column. Not supported.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <param name="dataOffset">The index from which to begin the read operation.</param>
/// <param name="buffer">The buffer into which the data is copied.</param>
/// <param name="bufferOffset">The index to which the data will be copied.</param>
/// <param name="length">The maximum number of characters to read.</param>
/// <returns>The actual number of characters read.</returns>
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)
=> throw new NotSupportedException();
/// <summary>
/// Retrieves data as a Stream. Not supported.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The returned object.</returns>
public override Stream GetStream(int ordinal)
=> throw new NotSupportedException();
/// <summary>
/// Gets the value of the specified column.
/// </summary>
/// <typeparam name="T">The type of the value.</typeparam>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override T GetFieldValue<T>(int ordinal) => (T)Convert.ChangeType(GetValue(ordinal), typeof(T));
/// <summary>
/// Gets the value of the specified column.
/// </summary>
/// <param name="ordinal">The zero-based column ordinal.</param>
/// <returns>The value of the column.</returns>
public override object GetValue(int ordinal)
{
object result;
if (ordinal == 0)
{
result = rowdata.GetDateTime();
}
else
{
result = rowdata.Values[ordinal - 1];
}
if (result == null)
{
result = DBNull.Value;
}
return result;
}
/// <summary>
/// Gets the column values of the current row.
/// </summary>
/// <param name="values">An array into which the values are copied.</param>
/// <returns>The number of values copied into the array.</returns>
public override int GetValues(object[] values)
{
int count = 0;
values[0] = rowdata.GetDateTime();
for (int i = 0; i < _fieldCount; i++)
{
var obj = rowdata.Values[i];
if (obj != null)
{
values[i + 1] = obj;
count++;
}
}
return count;
}
/// <summary>
/// Returns a System.Data.DataTable that describes the column metadata of the System.Data.Common.DbDataReader.
/// </summary>
/// <returns>A System.Data.DataTable that describes the column metadata.</returns>
public override DataTable GetSchemaTable()
{
if (_dataSet.HasNext())
{
rowdata = _dataSet.GetRow();
}
var schemaTable = new DataTable("SchemaTable");
if (_metas != null && rowdata != null)
{
var ColumnName = new DataColumn(SchemaTableColumn.ColumnName, typeof(string));
var ColumnOrdinal = new DataColumn(SchemaTableColumn.ColumnOrdinal, typeof(int));
var ColumnSize = new DataColumn(SchemaTableColumn.ColumnSize, typeof(int));
var NumericPrecision = new DataColumn(SchemaTableColumn.NumericPrecision, typeof(short));
var NumericScale = new DataColumn(SchemaTableColumn.NumericScale, typeof(short));
var DataType = new DataColumn(SchemaTableColumn.DataType, typeof(Type));
var DataTypeName = new DataColumn("DataTypeName", typeof(string));
var IsLong = new DataColumn(SchemaTableColumn.IsLong, typeof(bool));
var AllowDBNull = new DataColumn(SchemaTableColumn.AllowDBNull, typeof(bool));
var IsUnique = new DataColumn(SchemaTableColumn.IsUnique, typeof(bool));
var IsKey = new DataColumn(SchemaTableColumn.IsKey, typeof(bool));
var IsAutoIncrement = new DataColumn(SchemaTableOptionalColumn.IsAutoIncrement, typeof(bool));
var BaseCatalogName = new DataColumn(SchemaTableOptionalColumn.BaseCatalogName, typeof(string));
var BaseSchemaName = new DataColumn(SchemaTableColumn.BaseSchemaName, typeof(string));
var BaseTableName = new DataColumn(SchemaTableColumn.BaseTableName, typeof(string));
var BaseColumnName = new DataColumn(SchemaTableColumn.BaseColumnName, typeof(string));
var BaseServerName = new DataColumn(SchemaTableOptionalColumn.BaseServerName, typeof(string));
var IsAliased = new DataColumn(SchemaTableColumn.IsAliased, typeof(bool));
var IsExpression = new DataColumn(SchemaTableColumn.IsExpression, typeof(bool));
var columns = schemaTable.Columns;
columns.Add(ColumnName);
columns.Add(ColumnOrdinal);
columns.Add(ColumnSize);
columns.Add(NumericPrecision);
columns.Add(NumericScale);
columns.Add(IsUnique);
columns.Add(IsKey);
columns.Add(BaseServerName);
columns.Add(BaseCatalogName);
columns.Add(BaseColumnName);
columns.Add(BaseSchemaName);
columns.Add(BaseTableName);
columns.Add(DataType);
columns.Add(DataTypeName);
columns.Add(AllowDBNull);
columns.Add(IsAliased);
columns.Add(IsExpression);
columns.Add(IsAutoIncrement);
columns.Add(IsLong);
var schemaRow1 = schemaTable.NewRow();
var columnName1 = "timestamp";
schemaRow1[ColumnName] = columnName1;
schemaRow1[ColumnOrdinal] = 0;
schemaRow1[NumericPrecision] = DBNull.Value;
schemaRow1[NumericScale] = DBNull.Value;
schemaRow1[BaseServerName] = _command.Connection.DataSource;
schemaRow1[BaseColumnName] = columnName1;
schemaRow1[BaseSchemaName] = DBNull.Value;
var tableName1 = string.Empty;
schemaRow1[BaseTableName] = tableName1;
schemaRow1[DataType] = typeof(DateTime);
schemaRow1[DataTypeName] = nameof(DateTime);
schemaRow1[IsExpression] = columnName1 == null;
schemaRow1[IsLong] = DBNull.Value;
schemaRow1[IsKey] = true;
schemaTable.Rows.Add(schemaRow1);
for (var i = 1; i < rowdata.Measurements.Count + 1; i++)
{
var schemaRow = schemaTable.NewRow();
var columnName = rowdata.Measurements[i - 1];
schemaRow[ColumnName] = columnName;
schemaRow[ColumnOrdinal] = i;
schemaRow[NumericPrecision] = DBNull.Value;
schemaRow[NumericScale] = DBNull.Value;
schemaRow[BaseServerName] = _command.Connection.DataSource;
schemaRow[BaseColumnName] = columnName;
schemaRow[BaseSchemaName] = DBNull.Value;
var tableName = string.Empty;
schemaRow[BaseTableName] = tableName;
schemaRow[DataType] = GetFieldType(i);
schemaRow[DataTypeName] = GetDataTypeName(i);
schemaRow[IsExpression] = columnName == null;
schemaRow[IsLong] = DBNull.Value;
schemaRow[IsKey] = false;
schemaRow[AllowDBNull] = true;
schemaTable.Rows.Add(schemaRow);
}
}
return schemaTable;
}
}
}