blob: c1494d320edfca5dedb1b0bc46c0e70072e26444 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "storage/row_cursor.h"
#include <glog/logging.h>
#include <stdlib.h>
#include <algorithm>
#include <new>
#include <numeric>
#include <ostream>
#include "common/cast_set.h"
#include "storage/field.h"
#include "storage/olap_common.h"
#include "storage/olap_define.h"
#include "storage/tablet/tablet_schema.h"
#include "util/slice.h"
using std::nothrow;
using std::string;
using std::vector;
namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
RowCursor::RowCursor()
: _fixed_len(0), _variable_len(0), _string_field_count(0), _long_text_buf(nullptr) {}
RowCursor::~RowCursor() {
delete[] _owned_fixed_buf;
delete[] _variable_buf;
if (_string_field_count > 0 && _long_text_buf != nullptr) {
for (int i = 0; i < _string_field_count; ++i) {
free(_long_text_buf[i]);
}
free(_long_text_buf);
}
}
Status RowCursor::_init(const std::vector<uint32_t>& columns) {
_variable_len = 0;
for (auto cid : columns) {
if (_schema->column(cid) == nullptr) {
return Status::Error<INIT_FAILED>("Fail to malloc _fixed_buf.");
}
_variable_len += column_schema(cid)->get_variable_len();
if (_schema->column(cid)->type() == FieldType::OLAP_FIELD_TYPE_STRING) {
++_string_field_count;
}
}
_fixed_len = _schema->schema_size();
_fixed_buf = new (nothrow) char[_fixed_len]();
if (_fixed_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _fixed_buf.");
}
_owned_fixed_buf = _fixed_buf;
return Status::OK();
}
Status RowCursor::_init(const std::shared_ptr<Schema>& shared_schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(*shared_schema));
return _init(columns);
}
Status RowCursor::_init(const std::vector<TabletColumnPtr>& schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(schema, columns));
return _init(columns);
}
Status RowCursor::_init_scan_key(TabletSchemaSPtr schema,
const std::vector<std::string>& scan_keys) {
// NOTE: cid equal with column index
// Hyperloglog cannot be key, no need to handle it
_variable_len = 0;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema->column(cid);
FieldType type = column.type();
if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
_variable_len += scan_keys[cid].length();
} else if (type == FieldType::OLAP_FIELD_TYPE_CHAR ||
type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
_variable_len +=
std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
} else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
++_string_field_count;
}
}
// variable_len for null bytes
RETURN_IF_ERROR(_alloc_buf());
char* fixed_ptr = _fixed_buf;
char* variable_ptr = _variable_buf;
char** long_text_ptr = _long_text_buf;
for (auto cid : _schema->column_ids()) {
const TabletColumn& column = schema->column(cid);
fixed_ptr = _fixed_buf + _schema->column_offset(cid);
FieldType type = column.type();
if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
// Use memcpy to avoid misaligned store on fixed_ptr + 1
Slice slice(variable_ptr, scan_keys[cid].length());
memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
variable_ptr += scan_keys[cid].length();
} else if (type == FieldType::OLAP_FIELD_TYPE_CHAR) {
// Use memcpy to avoid misaligned store on fixed_ptr + 1
size_t len = std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
Slice slice(variable_ptr, len);
memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
variable_ptr += len;
} else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
// Use memcpy to avoid misaligned store on fixed_ptr + 1
_schema->mutable_column(cid)->set_long_text_buf(long_text_ptr);
Slice slice(*(long_text_ptr), DEFAULT_TEXT_LENGTH);
memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
++long_text_ptr;
}
}
return Status::OK();
}
Status RowCursor::_init(TabletSchemaSPtr schema, uint32_t column_count) {
if (column_count > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
column_count, schema->num_columns());
}
std::vector<uint32_t> columns;
for (auto i = 0; i < column_count; ++i) {
columns.push_back(i);
}
RETURN_IF_ERROR(_init(schema->columns(), columns));
return Status::OK();
}
Status RowCursor::init_scan_key(TabletSchemaSPtr schema,
const std::vector<std::string>& scan_keys) {
size_t scan_key_size = scan_keys.size();
if (scan_key_size > schema->num_columns()) {
return Status::Error<INVALID_ARGUMENT>(
"Input param are invalid. Column count is bigger than num_columns of schema. "
"column_count={}, schema.num_columns={}",
scan_key_size, schema->num_columns());
}
std::vector<uint32_t> columns(scan_key_size);
std::iota(columns.begin(), columns.end(), 0);
RETURN_IF_ERROR(_init(schema->columns(), columns));
return _init_scan_key(schema, scan_keys);
}
Status RowCursor::init_scan_key(TabletSchemaSPtr schema, const std::vector<std::string>& scan_keys,
const std::shared_ptr<Schema>& shared_schema) {
size_t scan_key_size = scan_keys.size();
std::vector<uint32_t> columns;
for (uint32_t i = 0; i < scan_key_size; ++i) {
columns.push_back(i);
}
RETURN_IF_ERROR(_init(shared_schema, columns));
return _init_scan_key(schema, scan_keys);
}
Status RowCursor::from_tuple(const OlapTuple& tuple) {
if (tuple.size() != _schema->num_column_ids()) {
return Status::Error<INVALID_ARGUMENT>(
"column count does not match. tuple_size={}, field_count={}", tuple.size(),
_schema->num_column_ids());
}
_row_string.resize(tuple.size());
for (size_t i = 0; i < tuple.size(); ++i) {
auto cid = _schema->column_ids()[i];
const StorageField* field = column_schema(cid);
if (tuple.is_null(i)) {
_set_null(cid);
continue;
}
_set_not_null(cid);
_row_string[i] = tuple.get_value(i);
char* buf = _cell_ptr(cid);
Status res = field->from_string(buf, tuple.get_value(i), field->get_precision(),
field->get_scale());
if (!res.ok()) {
LOG(WARNING) << "fail to convert field from string. string=" << tuple.get_value(i)
<< ", res=" << res;
return res;
}
}
return Status::OK();
}
std::string RowCursor::to_string() const {
std::string result;
size_t i = 0;
for (auto cid : _schema->column_ids()) {
if (i > 0) {
result.append("|");
}
result.append(std::to_string(_is_null(cid)));
result.append("&");
if (_is_null(cid)) {
result.append("NULL");
} else {
result.append(_row_string[i]);
}
++i;
}
return result;
}
Status RowCursor::_alloc_buf() {
// variable_len for null bytes
_variable_buf = new (nothrow) char[_variable_len]();
if (_variable_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _variable_buf.");
}
if (_string_field_count > 0) {
_long_text_buf = (char**)malloc(_string_field_count * sizeof(char*));
if (_long_text_buf == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
}
for (int i = 0; i < _string_field_count; ++i) {
_long_text_buf[i] = (char*)malloc(DEFAULT_TEXT_LENGTH * sizeof(char));
if (_long_text_buf[i] == nullptr) {
return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
}
}
}
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris