blob: c215f965086f92c710db7bf3b426713de612806c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "storage/cwrapper/orc-format-c.h"
#include <uuid/uuid.h>
#include <memory>
#include <string>
#include <vector>
#include "dbcommon/common/vector-transformer.h"
#include "dbcommon/common/vector/decimal-vector.h"
#include "dbcommon/common/vector/timestamp-vector.h"
#include "dbcommon/filesystem/file-system.h"
#include "dbcommon/function/decimal-function.h"
#include "dbcommon/function/typecast-func.cg.h"
#include "dbcommon/type/date.h"
#include "dbcommon/type/decimal.h"
#include "dbcommon/type/type-kind.h"
#include "dbcommon/utils/global.h"
#include "dbcommon/utils/url.h"
#include "storage/format/format.h"
#include "storage/format/orc/orc-format.h"
#include "univplan/univplanbuilder/univplanbuilder-scan-task.h"
#define NUMERIC_POS 0x0000
#define NUMERIC_NEG 0x4000
#define DEC_DIGITS 4
#define NUMERIC_DSCALE_MASK 0x3FF
#define NUMERIC_HDRSZ (sizeof(int32_t) + sizeof(uint16_t) + sizeof(int16_t))
#ifdef __cplusplus
extern "C" {
#endif
static void ORCFormatSetErrorORCFormatC(ORCFormatCatchedError *ce, int errCode,
const char *errMsg);
typedef struct OrcColumnReader {
dbcommon::TypeKind type;
const char *value;
const bool *nulls;
const uint64_t *lens;
std::unique_ptr<dbcommon::ByteBuffer> valBuffer;
} OrcColumnReader;
struct ORCFormatC {
std::unique_ptr<storage::ORCFormat> orcFormat; // NOLINT
dbcommon::URL::uptr url;
dbcommon::Parameters params;
dbcommon::TupleDesc desc;
ORCFormatCatchedError error;
std::vector<bool> columnsToRead;
univplan::UnivPlanScanFileSplitListList splits;
dbcommon::TupleBatch::uptr tb;
std::string insertFileName;
bool needNewTupleBatch;
uint64_t rowRead;
uint64_t rowCount;
std::vector<std::unique_ptr<OrcColumnReader>> columnReaders;
std::vector<uint32_t> colToReadIds;
};
typedef struct NumericTransData {
int32_t varlen; // total size counted in byte
int16_t weight; // size of integral part, counted in int16_t
uint16_t sign_dscale; // sign and scale
int16_t digits[0];
} NumericTransData;
ORCFormatC *ORCFormatNewORCFormatC(const char *tableOptions, int segno) {
ORCFormatC *instance = new ORCFormatC();
univplan::UNIVPLANFORMATTYPE type = univplan::UNIVPLANFORMATTYPE::ORC_FORMAT;
instance->params.set("table.options", tableOptions);
instance->orcFormat.reset(new storage::ORCFormat(&(instance->params)));
instance->orcFormat->setFileSystemManager(&FSManager);
instance->tb = nullptr;
instance->url = nullptr;
instance->error.errCode = ERRCODE_SUCCESSFUL_COMPLETION;
instance->insertFileName = "/" + std::to_string(segno + 1) + "_";
return instance;
}
void ORCFormatBeginORCFormatC(ORCFormatC *fmt, ORCFormatFileSplit *splits,
int numSplits, bool *columnsToRead,
char **columnName, int *columnDatatype,
uint64_t *columnDatatypeMod, int numColumns) {
try {
fmt->tb = nullptr;
fmt->needNewTupleBatch = true;
for (int i = 0; i < numColumns; ++i) {
fmt->columnsToRead.push_back(columnsToRead[i]);
fmt->desc.add(columnName[i],
(static_cast<dbcommon::TypeKind>(columnDatatype[i])),
columnDatatypeMod[i]);
if (columnsToRead[i]) {
std::unique_ptr<OrcColumnReader> columnReader(new OrcColumnReader);
columnReader->type = static_cast<dbcommon::TypeKind>(columnDatatype[i]);
switch (columnReader->type) {
case dbcommon::TypeKind::STRINGID:
case dbcommon::TypeKind::CHARID:
case dbcommon::TypeKind::VARCHARID:
case dbcommon::TypeKind::BINARYID:
case dbcommon::TypeKind::TIMESTAMPID:
case dbcommon::TypeKind::TIMESTAMPTZID:
case dbcommon::TypeKind::DECIMALID:
columnReader->valBuffer.reset(new dbcommon::ByteBuffer(true));
columnReader->valBuffer->reserve(DEFAULT_RESERVED_SIZE_OF_STRING *
DEFAULT_NUMBER_TUPLES_PER_BATCH);
break;
default:
columnReader->valBuffer = nullptr;
break;
}
fmt->columnReaders.push_back(std::move(columnReader));
fmt->colToReadIds.push_back(i);
}
}
// create one scan task to contain all splits
univplan::UnivPlanBuilderScanTask scanTaskBld;
// add all splits into scan task
for (int j = 0; j < numSplits; ++j) {
scanTaskBld.addScanFileSplit(splits[j].fileName, splits[j].start,
splits[j].len, -1, -1); // no rangeid, rgid
}
// build scan task by transfering tb from this builder to fmt instance
std::unique_ptr<univplan::UnivPlanScanFileSplitListTb> newScanTask(
new univplan::UnivPlanScanFileSplitListTb(
std::move(scanTaskBld.releaseSplitsTb())));
fmt->splits.push_back(std::move(newScanTask));
fmt->orcFormat->beginScan(&(fmt->splits), &(fmt->desc),
&(fmt->columnsToRead), nullptr, nullptr, false);
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatRescanORCFormatC(ORCFormatC *fmt) {
try {
fmt->orcFormat->reScan();
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatEndORCFormatC(ORCFormatC *fmt) {
try {
fmt->orcFormat->endScan();
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatBeginInsertORCFormatC(ORCFormatC *fmt, const char *dirFullPath,
char **columnName, int *columnDatatype,
uint64_t *columnDatatypeMod,
int numColumns) {
try {
fmt->tb = nullptr;
for (int i = 0; i < numColumns; ++i) {
fmt->desc.add(columnName[i],
(static_cast<dbcommon::TypeKind>(columnDatatype[i])),
columnDatatypeMod[i]);
}
std::string dirFullInsertPath(dirFullPath);
dirFullInsertPath += INSERT_HIDDEN_DIR;
fmt->url.reset(new dbcommon::URL(dirFullInsertPath));
dbcommon::FileSystem *fs = FSManager.get(dirFullPath);
std::string targetPath = fmt->url->getPath();
std::string targetRawPath = fmt->url->getRawString();
if (!fs->exists(targetPath.c_str())) {
LOG_ERROR(ERRCODE_DATA_EXCEPTION, "no data directory found: %s",
targetPath.c_str());
}
// Generate filename for current insertion.
uuid_t uuid;
char buf[1024];
uuid_generate_time(uuid);
uuid_unparse(uuid, buf);
fmt->insertFileName.append(buf, strlen(buf));
fmt->orcFormat->beginInsert(targetRawPath + fmt->insertFileName, fmt->desc);
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatInsertORCFormatC(ORCFormatC *fmt, int *datatypes, char **values,
uint64_t *lens, unsigned char **nullBitmap,
int32_t **dims, bool *isNull) {
try {
if (fmt->tb == nullptr)
fmt->tb.reset(new dbcommon::TupleBatch(fmt->desc, true));
dbcommon::TupleBatchWriter &writers = fmt->tb->getTupleBatchWriter();
int natts = fmt->desc.getNumOfColumns();
for (int i = 0; i < natts; ++i) {
dbcommon::TypeKind datatype =
(static_cast<dbcommon::TypeKind>(datatypes[i]));
switch (datatype) {
case dbcommon::TypeKind::BOOLEANID:
writers[i]->append(reinterpret_cast<char *>(values[i]), sizeof(bool),
isNull[i]);
break;
case dbcommon::TypeKind::TINYINTID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(int8_t), isNull[i]);
break;
case dbcommon::TypeKind::SMALLINTID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(int16_t), isNull[i]);
break;
case dbcommon::TypeKind::INTID:
case dbcommon::TypeKind::DATEID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(int32_t), isNull[i]);
break;
case dbcommon::TypeKind::BIGINTID:
case dbcommon::TypeKind::TIMEID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(int64_t), isNull[i]);
break;
case dbcommon::TypeKind::FLOATID:
writers[i]->append(reinterpret_cast<char *>(values[i]), sizeof(float),
isNull[i]);
break;
case dbcommon::TypeKind::DOUBLEID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(double), isNull[i]);
break;
case dbcommon::TypeKind::CHARID:
case dbcommon::TypeKind::VARCHARID:
case dbcommon::TypeKind::STRINGID:
case dbcommon::TypeKind::BINARYID:
case dbcommon::TypeKind::DECIMALID:
writers[i]->append(reinterpret_cast<char *>(values[i]), isNull[i]);
break;
case dbcommon::TypeKind::TIMESTAMPID:
case dbcommon::TypeKind::TIMESTAMPTZID:
writers[i]->append(reinterpret_cast<char *>(values[i]),
sizeof(int64_t) + sizeof(int64_t), isNull[i]);
break;
case dbcommon::TypeKind::SMALLINTARRAYID:
case dbcommon::TypeKind::INTARRAYID:
case dbcommon::TypeKind::BIGINTARRAYID:
case dbcommon::TypeKind::FLOATARRAYID:
case dbcommon::TypeKind::DOUBLEARRAYID: {
dbcommon::ListVector *lwriter =
reinterpret_cast<dbcommon::ListVector *>(writers[i].get());
lwriter->append(reinterpret_cast<char *>(values[i]), lens[i],
nullBitmap[i], dims[i], isNull[i], true);
break;
}
case dbcommon::TypeKind::INVALIDTYPEID:
LOG_ERROR(ERRCODE_DATA_EXCEPTION, "data type with id %d is invalid",
static_cast<int>(datatype));
default:
LOG_ERROR(ERRCODE_DATA_EXCEPTION,
"data type with id %d is not supported yet",
static_cast<int>(datatype));
break;
}
}
fmt->tb->incNumOfRows(1);
if (fmt->tb->getNumOfRows() >= storage::Format::kTuplesPerBatch) {
fmt->orcFormat->doInsert(std::move(fmt->tb));
fmt->tb = nullptr;
}
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatEndInsertORCFormatC(ORCFormatC *fmt) {
try {
if (fmt->tb) fmt->orcFormat->doInsert(std::move(fmt->tb)); // NOLINT
fmt->orcFormat->endInsert();
dbcommon::FileSystem *fs = FSManager.get(fmt->url->getRawString());
fs->rename((fmt->url->getPath() + fmt->insertFileName).c_str(),
(fmt->url->getPath() + "/.." + fmt->insertFileName).c_str());
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
}
}
void ORCFormatFreeORCFormatC(ORCFormatC **fmt) {
if (*fmt == nullptr) return;
delete *fmt;
*fmt = nullptr;
}
ORCFormatCatchedError *ORCFormatGetErrorORCFormatC(ORCFormatC *fmt) {
return &(fmt->error);
}
void ORCFormatSetErrorORCFormatC(ORCFormatCatchedError *ce, int errCode,
const char *errMsg) {
assert(ce != nullptr);
ce->errCode = errCode;
snprintf(ce->errMessage, strlen(errMsg) + 1, "%s", errMsg);
}
static void textRelatedGetValueBuffer(ORCFormatC *fmt, dbcommon::BytesVector *v,
OrcColumnReader *reader) {
bool hasNull = v->hasNullValue();
const uint64_t *lens = v->getLengths();
const char **valPtrs = v->getValPtrs();
reader->valBuffer->clear();
reader->lens = lens;
if (hasNull) {
const bool *nulls = v->getNullBuffer()->getBools();
for (uint64_t i = 0; i < fmt->rowCount; ++i) {
if (!nulls[i]) {
uint32_t len = lens[i];
reader->valBuffer->append(len);
reader->valBuffer->append(valPtrs[i], len);
}
}
reader->nulls = nulls;
} else {
for (uint64_t i = 0; i < fmt->rowCount; ++i) {
uint32_t len = lens[i];
reader->valBuffer->append(len);
reader->valBuffer->append(valPtrs[i], len);
}
reader->nulls = nullptr;
}
reader->value = reader->valBuffer->data();
}
static void timestampGetValueBuffer(ORCFormatC *fmt,
dbcommon::TimestampVector *v,
OrcColumnReader *reader) {
bool hasNull = v->hasNullValue();
const char **valPtrs = v->getValPtrs();
const int64_t *second = reinterpret_cast<const int64_t *>(v->getValue());
const int64_t *nanosecond =
reinterpret_cast<const int64_t *>(v->getNanoseconds());
reader->valBuffer->clear();
if (hasNull) {
const bool *nulls = v->getNullBuffer()->getBools();
for (uint64_t i = 0; i < fmt->rowCount; ++i) {
if (!nulls[i]) {
int64_t val = (second[i] - TIMESTAMP_EPOCH_JDATE) * 1000000 +
nanosecond[i] / 1000;
reader->valBuffer->append(val);
}
}
reader->nulls = nulls;
} else {
for (uint64_t i = 0; i < fmt->rowCount; ++i) {
int64_t val =
(second[i] - TIMESTAMP_EPOCH_JDATE) * 1000000 + nanosecond[i] / 1000;
reader->valBuffer->append(val);
}
reader->nulls = nullptr;
}
reader->value = reader->valBuffer->data();
}
static void decimalGetValueBuffer(dbcommon::DecimalVector *srcVector,
OrcColumnReader *reader) {
dbcommon::DecimalVectorRawData src(srcVector);
auto convertNumericTranData = [&](uint64_t plainIdx) {
NumericTransData numeric;
dbcommon::Int128 data(src.hightbits[plainIdx], src.lowbits[plainIdx]);
numeric.sign_dscale = NUMERIC_POS;
if (data.isNegative()) {
numeric.sign_dscale = NUMERIC_NEG;
data = data.negate();
}
// Pad zero for fractional part in order to make it counted by int16_t
int16_t scaleDigitCount = src.scales[plainIdx];
int16_t paddingDigitCount =
(DEC_DIGITS - scaleDigitCount % DEC_DIGITS) % DEC_DIGITS;
int16_t significantDigitCount = data.getNumOfDigit();
bool isPaddingSuffix = significantDigitCount > scaleDigitCount;
int16_t totalDigitCount = isPaddingSuffix
? significantDigitCount + paddingDigitCount
: scaleDigitCount + paddingDigitCount;
numeric.sign_dscale |= (scaleDigitCount & NUMERIC_DSCALE_MASK);
numeric.weight = isPaddingSuffix ? (totalDigitCount - scaleDigitCount -
paddingDigitCount + (DEC_DIGITS - 1)) /
DEC_DIGITS -
1
: -1;
numeric.varlen =
NUMERIC_HDRSZ +
((totalDigitCount + DEC_DIGITS - 1) / DEC_DIGITS) * sizeof(int16_t);
// Reserver buffer
reader->valBuffer->resize(reader->valBuffer->size() + numeric.varlen);
// Fill header
*reinterpret_cast<NumericTransData *>(reader->valBuffer->tail() -
numeric.varlen) = numeric;
// Fill digits
__int128_t dividend =
(__int128_t(data.getHighBits()) << 64) + __int128_t(data.getLowBits());
for (int i = 0; i < paddingDigitCount; i++) dividend *= 10;
int16_t *ptr = reinterpret_cast<int16_t *>(reader->valBuffer->tail());
for (int i = 0; i < (numeric.varlen - NUMERIC_HDRSZ) / sizeof(int16_t);
i++) {
int16_t remainder = dividend % 10000;
*--ptr = remainder;
dividend /= 10000;
}
assert(reinterpret_cast<char *>(ptr) ==
reader->valBuffer->tail() - numeric.varlen + NUMERIC_HDRSZ);
};
reader->valBuffer->clear();
dbcommon::transformVector(src.plainSize, src.sel, src.nulls,
convertNumericTranData);
reader->nulls = srcVector->getNulls();
reader->value = reader->valBuffer->data();
}
static void columnReadGetContent(ORCFormatC *fmt) {
const dbcommon::TupleBatchReader &tbReader = fmt->tb->getTupleBatchReader();
int32_t colIndex = 0;
for (auto plainColIndex : fmt->colToReadIds) {
OrcColumnReader *colReader = fmt->columnReaders[colIndex++].get();
switch (colReader->type) {
case dbcommon::TypeKind::STRINGID:
case dbcommon::TypeKind::CHARID:
case dbcommon::TypeKind::VARCHARID:
case dbcommon::TypeKind::BINARYID: {
dbcommon::BytesVector *v = dynamic_cast<dbcommon::BytesVector *>(
tbReader[plainColIndex].get());
textRelatedGetValueBuffer(fmt, v, colReader);
break;
}
case dbcommon::TypeKind::TIMESTAMPID:
case dbcommon::TypeKind::TIMESTAMPTZID: {
dbcommon::TimestampVector *v =
dynamic_cast<dbcommon::TimestampVector *>(
tbReader[plainColIndex].get());
timestampGetValueBuffer(fmt, v, colReader);
break;
}
case dbcommon::TypeKind::DECIMALID: {
dbcommon::DecimalVector *v = dynamic_cast<dbcommon::DecimalVector *>(
tbReader[plainColIndex].get());
decimalGetValueBuffer(v, colReader);
break;
}
case dbcommon::TypeKind::BOOLEANID:
case dbcommon::TypeKind::SMALLINTID:
case dbcommon::TypeKind::INTID:
case dbcommon::TypeKind::BIGINTID:
case dbcommon::TypeKind::FLOATID:
case dbcommon::TypeKind::DOUBLEID:
case dbcommon::TypeKind::DATEID:
case dbcommon::TypeKind::TIMEID: {
dbcommon::Vector *v =
dynamic_cast<dbcommon::Vector *>(tbReader[plainColIndex].get());
if (v->hasNullValue()) {
colReader->nulls = v->getNullBuffer()->getBools();
} else {
colReader->nulls = nullptr;
}
colReader->value = v->getValue();
break;
}
default: {
LOG_ERROR(ERRCODE_DATA_EXCEPTION, "not supported yet");
break;
}
}
}
}
bool ORCFormatNextORCFormatC(ORCFormatC *fmt, const char **values,
uint64_t *lens, bool *nulls) {
try {
begin:
if (fmt->needNewTupleBatch) {
fmt->tb = fmt->orcFormat->next();
if (fmt->tb == nullptr) {
return false;
}
fmt->needNewTupleBatch = false;
fmt->rowRead = 0;
fmt->rowCount = fmt->tb->getNumOfRows();
if (fmt->rowCount > 0) columnReadGetContent(fmt);
}
if (fmt->rowRead < fmt->rowCount) {
int32_t colIndex = 0;
for (auto plainColIndex : fmt->colToReadIds) {
OrcColumnReader *reader = fmt->columnReaders[colIndex++].get();
switch (reader->type) {
case dbcommon::TypeKind::STRINGID:
case dbcommon::TypeKind::CHARID:
case dbcommon::TypeKind::VARCHARID:
case dbcommon::TypeKind::BINARYID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
lens[plainColIndex] = reader->lens[fmt->rowRead] + 4;
reader->value += lens[plainColIndex];
}
break;
}
case dbcommon::TypeKind::BOOLEANID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
}
reader->value += 1;
break;
}
case dbcommon::TypeKind::SMALLINTID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
}
reader->value += 2;
break;
}
case dbcommon::TypeKind::INTID:
case dbcommon::TypeKind::FLOATID:
case dbcommon::TypeKind::DATEID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
}
reader->value += 4;
break;
}
case dbcommon::TypeKind::BIGINTID:
case dbcommon::TypeKind::DOUBLEID:
case dbcommon::TypeKind::TIMEID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
}
reader->value += 8;
break;
}
case dbcommon::TypeKind::TIMESTAMPID:
case dbcommon::TypeKind::TIMESTAMPTZID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
reader->value += 8;
}
break;
}
case dbcommon::TypeKind::DECIMALID: {
if (reader->nulls && reader->nulls[fmt->rowRead]) {
nulls[plainColIndex] = true;
} else {
nulls[plainColIndex] = false;
values[plainColIndex] = reader->value;
lens[plainColIndex] =
(reinterpret_cast<const NumericTransData *>(reader->value))
->varlen;
reader->value += lens[plainColIndex];
}
break;
}
default: {
LOG_ERROR(ERRCODE_DATA_EXCEPTION, "not supported yet");
break;
}
}
}
++fmt->rowRead;
} else {
fmt->needNewTupleBatch = true;
goto begin;
}
return true;
} catch (dbcommon::TransactionAbortException &e) {
ORCFormatSetErrorORCFormatC(&(fmt->error), e.errCode(), e.what());
return false;
}
}
#ifdef __cplusplus
}
#endif