blob: 0cb5d3e1b73e2779d201e970049b4cc67f0d67a3 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "SessionDataSet.h"
#include <stdexcept>
#include "IoTDBRpcDataSet.h"
#include "SessionDataSetFactory.h"
using namespace std;
struct SessionDataSet::Impl {
std::shared_ptr<IoTDBRpcDataSet> iotdbRpcDataSet_;
};
std::unique_ptr<SessionDataSet>
createSessionDataSet(const std::string& sql, const std::vector<std::string>& columnNameList,
const std::vector<std::string>& columnTypeList,
const std::map<std::string, int32_t>& columnNameIndex, int64_t queryId,
int64_t statementId, std::shared_ptr<IClientRPCServiceClient> client,
int64_t sessionId, const std::vector<std::string>& queryResult,
bool ignoreTimestamp, int64_t timeout, bool moreData, int32_t fetchSize,
const std::string& zoneId, int32_t timeFactor,
std::vector<int32_t>& columnIndex2TsBlockColumnIndexList) {
auto dataSet = std::unique_ptr<SessionDataSet>(new SessionDataSet());
dataSet->impl_ = std::unique_ptr<SessionDataSet::Impl>(new SessionDataSet::Impl());
dataSet->impl_->iotdbRpcDataSet_ = std::make_shared<IoTDBRpcDataSet>(
sql, columnNameList, columnTypeList, columnNameIndex, ignoreTimestamp, moreData, queryId,
statementId, client, sessionId, queryResult, fetchSize, timeout, zoneId,
IoTDBRpcDataSet::DEFAULT_TIME_FORMAT, timeFactor, columnIndex2TsBlockColumnIndexList);
return dataSet;
}
RowRecord::RowRecord(int64_t timestamp) {
this->timestamp = timestamp;
}
RowRecord::RowRecord(int64_t timestamp, const std::vector<Field>& fields)
: timestamp(timestamp), fields(fields) {}
RowRecord::RowRecord(const std::vector<Field>& fields) : timestamp(-1), fields(fields) {}
RowRecord::RowRecord() {
this->timestamp = -1;
}
void RowRecord::addField(const Field& f) {
this->fields.push_back(f);
}
std::string RowRecord::toString() {
std::string ret;
if (this->timestamp != -1) {
ret.append(std::to_string(timestamp));
ret.append("\t");
}
for (size_t i = 0; i < fields.size(); i++) {
if (i != 0) {
ret.append("\t");
}
const Field& f = fields[i];
switch (f.dataType) {
case TSDataType::BOOLEAN:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(f.boolV.value() ? "true" : "false");
}
break;
case TSDataType::INT32:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(std::to_string(f.intV.value()));
}
break;
case TSDataType::DATE:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(f.dateV.value().toIsoExtendedString());
}
break;
case TSDataType::TIMESTAMP:
case TSDataType::INT64:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(std::to_string(f.longV.value()));
}
break;
case TSDataType::FLOAT:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(std::to_string(f.floatV.value()));
}
break;
case TSDataType::DOUBLE:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(std::to_string(f.doubleV.value()));
}
break;
case TSDataType::BLOB:
case TSDataType::STRING:
case TSDataType::TEXT:
if (f.isNull()) {
ret.append("null");
} else {
ret.append(f.stringV.value());
}
break;
case TSDataType::OBJECT:
if (!f.stringV.is_initialized()) {
ret.append("null");
} else {
ret.append(f.stringV.value());
}
break;
default:
break;
}
}
ret.append("\n");
return ret;
}
SessionDataSet::~SessionDataSet() = default;
bool SessionDataSet::hasNext() {
if (impl_->iotdbRpcDataSet_->hasCachedRecord()) {
return true;
}
return impl_->iotdbRpcDataSet_->next();
}
shared_ptr<RowRecord> SessionDataSet::next() {
if (!impl_->iotdbRpcDataSet_->hasCachedRecord() && !hasNext()) {
return nullptr;
}
impl_->iotdbRpcDataSet_->setHasCachedRecord(false);
return constructRowRecordFromValueArray();
}
int SessionDataSet::getFetchSize() {
return impl_->iotdbRpcDataSet_->getFetchSize();
}
void SessionDataSet::setFetchSize(int fetchSize) {
impl_->iotdbRpcDataSet_->setFetchSize(fetchSize);
}
const std::vector<std::string>& SessionDataSet::getColumnNames() const {
return impl_->iotdbRpcDataSet_->getColumnNameList();
}
const std::vector<std::string>& SessionDataSet::getColumnTypeList() const {
return impl_->iotdbRpcDataSet_->getColumnTypeList();
}
void SessionDataSet::closeOperationHandle(bool forceClose) {
impl_->iotdbRpcDataSet_->close(forceClose);
}
SessionDataSet::DataIterator::DataIterator(std::shared_ptr<Impl> impl) : impl_(std::move(impl)) {}
bool SessionDataSet::DataIterator::next() {
return impl_->iotdbRpcDataSet_->next();
}
bool SessionDataSet::DataIterator::isNull(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->isNullByColumnName(columnName);
}
bool SessionDataSet::DataIterator::isNullByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->isNullByIndex(columnIndex);
}
Optional<bool> SessionDataSet::DataIterator::getBooleanByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getBooleanByIndex(columnIndex);
}
Optional<bool> SessionDataSet::DataIterator::getBoolean(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getBoolean(columnName);
}
Optional<double> SessionDataSet::DataIterator::getDoubleByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getDoubleByIndex(columnIndex);
}
Optional<double> SessionDataSet::DataIterator::getDouble(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getDouble(columnName);
}
Optional<float> SessionDataSet::DataIterator::getFloatByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getFloatByIndex(columnIndex);
}
Optional<float> SessionDataSet::DataIterator::getFloat(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getFloat(columnName);
}
Optional<int32_t> SessionDataSet::DataIterator::getIntByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getIntByIndex(columnIndex);
}
Optional<int32_t> SessionDataSet::DataIterator::getInt(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getInt(columnName);
}
Optional<int64_t> SessionDataSet::DataIterator::getLongByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getLongByIndex(columnIndex);
}
Optional<int64_t> SessionDataSet::DataIterator::getLong(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getLong(columnName);
}
Optional<std::string> SessionDataSet::DataIterator::getStringByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getStringByIndex(columnIndex);
}
Optional<std::string> SessionDataSet::DataIterator::getString(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getString(columnName);
}
Optional<int64_t> SessionDataSet::DataIterator::getTimestampByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getTimestampByIndex(columnIndex);
}
Optional<int64_t> SessionDataSet::DataIterator::getTimestamp(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getTimestamp(columnName);
}
Optional<IoTDBDate> SessionDataSet::DataIterator::getDateByIndex(int32_t columnIndex) {
return impl_->iotdbRpcDataSet_->getDateByIndex(columnIndex);
}
Optional<IoTDBDate> SessionDataSet::DataIterator::getDate(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->getDate(columnName);
}
int32_t SessionDataSet::DataIterator::findColumn(const std::string& columnName) {
return impl_->iotdbRpcDataSet_->findColumn(columnName);
}
const std::vector<std::string>& SessionDataSet::DataIterator::getColumnNames() const {
return impl_->iotdbRpcDataSet_->getColumnNameList();
}
const std::vector<std::string>& SessionDataSet::DataIterator::getColumnTypeList() const {
return impl_->iotdbRpcDataSet_->getColumnTypeList();
}
SessionDataSet::DataIterator SessionDataSet::getIterator() {
return DataIterator(std::shared_ptr<Impl>(impl_.get(), [](Impl*) {}));
}
shared_ptr<RowRecord> SessionDataSet::constructRowRecordFromValueArray() {
std::vector<Field> outFields;
for (int i = impl_->iotdbRpcDataSet_->getValueColumnStartIndex();
i < impl_->iotdbRpcDataSet_->getColumnSize(); i++) {
Field field;
std::string columnName = impl_->iotdbRpcDataSet_->getColumnNameList().at(i);
if (!impl_->iotdbRpcDataSet_->isNullByColumnName(columnName)) {
TSDataType::TSDataType dataType = impl_->iotdbRpcDataSet_->getDataType(columnName);
field.dataType = dataType;
switch (dataType) {
case TSDataType::BOOLEAN:
field.boolV = impl_->iotdbRpcDataSet_->getBoolean(columnName);
break;
case TSDataType::INT32:
field.intV = impl_->iotdbRpcDataSet_->getInt(columnName);
break;
case TSDataType::DATE:
field.dateV = impl_->iotdbRpcDataSet_->getDate(columnName);
break;
case TSDataType::INT64:
case TSDataType::TIMESTAMP:
field.longV = impl_->iotdbRpcDataSet_->getLong(columnName);
break;
case TSDataType::FLOAT:
field.floatV = impl_->iotdbRpcDataSet_->getFloat(columnName);
break;
case TSDataType::DOUBLE:
field.doubleV = impl_->iotdbRpcDataSet_->getDouble(columnName);
break;
case TSDataType::TEXT:
case TSDataType::BLOB:
case TSDataType::STRING:
case TSDataType::OBJECT:
field.stringV = impl_->iotdbRpcDataSet_->getBinary(columnName)->getStringValue();
break;
default:
throw UnSupportedDataTypeException("Data type is not supported.");
}
}
outFields.emplace_back(field);
}
return std::make_shared<RowRecord>(impl_->iotdbRpcDataSet_->getCurrentRowTime(), outFields);
}