blob: 89c3f4cb168aab0f5dbc61514e0dc5ae5b0c9bd4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/csv/parser.h"
#include <algorithm>
#include <cstdio>
#include <utility>
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
namespace arrow {
namespace csv {
static Status ParseError(const char* message) {
return Status::Invalid("CSV parse error: ", message);
}
static Status MismatchingColumns(int32_t expected, int32_t actual) {
char s[50];
snprintf(s, sizeof(s), "Expected %d columns, got %d", expected, actual);
return ParseError(s);
}
static inline bool IsControlChar(uint8_t c) { return c < ' '; }
template <bool Quoting, bool Escaping>
class SpecializedOptions {
public:
static constexpr bool quoting = Quoting;
static constexpr bool escaping = Escaping;
};
// A helper class allocating the buffer for parsed values and writing into it
// without any further resizes, except at the end.
class BlockParser::PresizedParsedWriter {
public:
PresizedParsedWriter(MemoryPool* pool, uint32_t size)
: parsed_size_(0), parsed_capacity_(size) {
ARROW_CHECK_OK(AllocateResizableBuffer(pool, parsed_capacity_, &parsed_buffer_));
parsed_ = parsed_buffer_->mutable_data();
}
void Finish(std::shared_ptr<Buffer>* out_parsed) {
ARROW_CHECK_OK(parsed_buffer_->Resize(parsed_size_));
*out_parsed = parsed_buffer_;
}
void BeginLine() { saved_parsed_size_ = parsed_size_; }
void PushFieldChar(char c) {
DCHECK_LT(parsed_size_, parsed_capacity_);
parsed_[parsed_size_++] = static_cast<uint8_t>(c);
}
// Rollback the state that was saved in BeginLine()
void RollbackLine() { parsed_size_ = saved_parsed_size_; }
int64_t size() { return parsed_size_; }
protected:
std::shared_ptr<ResizableBuffer> parsed_buffer_;
uint8_t* parsed_;
int64_t parsed_size_;
int64_t parsed_capacity_;
// Checkpointing, for when an incomplete line is encountered at end of block
int64_t saved_parsed_size_;
};
// A helper class handling a growable buffer for values offsets. This class is
// used when the number of columns is not yet known and we therefore cannot
// efficiently presize the target area for a given number of rows.
class BlockParser::ResizableValuesWriter {
public:
explicit ResizableValuesWriter(MemoryPool* pool)
: values_size_(0), values_capacity_(256) {
ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
&values_buffer_));
values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
}
template <typename ParsedWriter>
void Start(ParsedWriter& parsed_writer) {
PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
}
void Finish(std::shared_ptr<Buffer>* out_values) {
ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
*out_values = values_buffer_;
}
void BeginLine() { saved_values_size_ = values_size_; }
void StartField(bool quoted) { quoted_ = quoted; }
template <typename ParsedWriter>
void FinishField(ParsedWriter* parsed_writer) {
PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
}
// Rollback the state that was saved in BeginLine()
void RollbackLine() { values_size_ = saved_values_size_; }
protected:
void PushValue(ValueDesc v) {
if (ARROW_PREDICT_FALSE(values_size_ == values_capacity_)) {
values_capacity_ = values_capacity_ * 2;
ARROW_CHECK_OK(values_buffer_->Resize(values_capacity_ * sizeof(*values_)));
values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
}
values_[values_size_++] = v;
}
std::shared_ptr<ResizableBuffer> values_buffer_;
ValueDesc* values_;
int64_t values_size_;
int64_t values_capacity_;
bool quoted_;
// Checkpointing, for when an incomplete line is encountered at end of block
int64_t saved_values_size_;
};
// A helper class allocating the buffer for values offsets and writing into it
// without any further resizes, except at the end. This class is used once the
// number of columns is known, as it eliminates resizes and generates simpler,
// faster CSV parsing code.
class BlockParser::PresizedValuesWriter {
public:
PresizedValuesWriter(MemoryPool* pool, int32_t num_rows, int32_t num_cols)
: values_size_(0), values_capacity_(1 + num_rows * num_cols) {
ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
&values_buffer_));
values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
}
template <typename ParsedWriter>
void Start(ParsedWriter& parsed_writer) {
PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
}
void Finish(std::shared_ptr<Buffer>* out_values) {
ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
*out_values = values_buffer_;
}
void BeginLine() { saved_values_size_ = values_size_; }
void StartField(bool quoted) { quoted_ = quoted; }
template <typename ParsedWriter>
void FinishField(ParsedWriter* parsed_writer) {
PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
}
// Rollback the state that was saved in BeginLine()
void RollbackLine() { values_size_ = saved_values_size_; }
protected:
void PushValue(ValueDesc v) {
DCHECK_LT(values_size_, values_capacity_);
values_[values_size_++] = v;
}
std::shared_ptr<ResizableBuffer> values_buffer_;
ValueDesc* values_;
int64_t values_size_;
const int64_t values_capacity_;
bool quoted_;
// Checkpointing, for when an incomplete line is encountered at end of block
int64_t saved_values_size_;
};
template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
Status BlockParser::ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
const char* data, const char* data_end, bool is_final,
const char** out_data) {
int32_t num_cols = 0;
char c;
DCHECK_GT(data_end, data);
auto FinishField = [&]() { values_writer->FinishField(parsed_writer); };
values_writer->BeginLine();
parsed_writer->BeginLine();
// The parsing state machine
// Special case empty lines: do we start with a newline separator?
c = *data;
if (ARROW_PREDICT_FALSE(IsControlChar(c)) && options_.ignore_empty_lines) {
if (c == '\r') {
data++;
if (data < data_end && *data == '\n') {
data++;
}
goto EmptyLine;
}
if (c == '\n') {
data++;
goto EmptyLine;
}
}
FieldStart:
// At the start of a field
// Quoting is only recognized at start of field
if (SpecializedOptions::quoting && ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
++data;
values_writer->StartField(true /* quoted */);
goto InQuotedField;
} else {
values_writer->StartField(false /* quoted */);
goto InField;
}
InField:
// Inside a non-quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
parsed_writer->PushFieldChar(c);
goto InField;
}
if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
if (ARROW_PREDICT_FALSE(IsControlChar(c))) {
if (c == '\r') {
// In the middle of a newline separator?
if (ARROW_PREDICT_TRUE(data < data_end) && *data == '\n') {
data++;
}
goto LineEnd;
}
if (c == '\n') {
goto LineEnd;
}
}
parsed_writer->PushFieldChar(c);
goto InField;
InQuotedField:
// Inside a quoted part of a field
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
parsed_writer->PushFieldChar(c);
goto InQuotedField;
}
if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
if (options_.double_quote && ARROW_PREDICT_TRUE(data < data_end) &&
ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
// Double-quoting
++data;
} else {
// End of single-quoting
goto InField;
}
}
parsed_writer->PushFieldChar(c);
goto InQuotedField;
FieldEnd:
// At the end of a field
FinishField();
++num_cols;
if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
goto FieldStart;
LineEnd:
// At the end of line
FinishField();
++num_cols;
if (ARROW_PREDICT_FALSE(num_cols != num_cols_)) {
if (num_cols_ == -1) {
num_cols_ = num_cols;
} else {
return MismatchingColumns(num_cols_, num_cols);
}
}
++num_rows_;
*out_data = data;
return Status::OK();
AbortLine:
// Not a full line except perhaps if in final block
if (is_final) {
FinishField();
++num_cols;
if (num_cols_ == -1) {
num_cols_ = num_cols;
} else if (num_cols != num_cols_) {
return MismatchingColumns(num_cols_, num_cols);
}
++num_rows_;
*out_data = data;
return Status::OK();
}
// Truncated line at end of block, rewind parsed state
values_writer->RollbackLine();
parsed_writer->RollbackLine();
return Status::OK();
EmptyLine:
*out_data = data;
return Status::OK();
}
template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
const char* data, const char* data_end, bool is_final,
int32_t rows_in_chunk, const char** out_data,
bool* finished_parsing) {
while (data < data_end && rows_in_chunk > 0) {
const char* line_end = data;
RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data,
data_end, is_final, &line_end));
if (line_end == data) {
// Cannot parse any further
*finished_parsing = true;
break;
}
data = line_end;
// This will pessimize chunk size a bit if there are empty lines,
// but that shouldn't be important
--rows_in_chunk;
}
// Append new buffers and update size
std::shared_ptr<Buffer> values_buffer;
values_writer->Finish(&values_buffer);
if (values_buffer->size() > 0) {
values_size_ += static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc) - 1);
values_buffers_.push_back(std::move(values_buffer));
}
*out_data = data;
return Status::OK();
}
template <typename SpecializedOptions>
Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final,
uint32_t* out_size) {
num_rows_ = 0;
values_size_ = 0;
parsed_size_ = 0;
values_buffers_.clear();
parsed_buffer_.reset();
parsed_ = nullptr;
const char* data = start;
const char* data_end = start + size;
bool finished_parsing = false;
PresizedParsedWriter parsed_writer(pool_, size);
if (num_cols_ == -1) {
// Can't presize values when the number of columns is not known, first parse
// a single line
const int32_t rows_in_chunk = 1;
ResizableValuesWriter values_writer(pool_);
values_writer.Start(parsed_writer);
RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
data_end, is_final, rows_in_chunk, &data,
&finished_parsing));
if (num_cols_ == -1) {
return ParseError("Empty CSV file or block: cannot infer number of columns");
}
}
while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) {
// We know the number of columns, so can presize a values array for
// a given number of rows
DCHECK_GE(num_cols_, 0);
int32_t rows_in_chunk;
constexpr int32_t kTargetChunkSize = 32768;
if (num_cols_ > 0) {
rows_in_chunk = std::min(std::max(kTargetChunkSize / num_cols_, 512),
max_num_rows_ - num_rows_);
} else {
rows_in_chunk = std::min(kTargetChunkSize, max_num_rows_ - num_rows_);
}
PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_);
values_writer.Start(parsed_writer);
RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
data_end, is_final, rows_in_chunk, &data,
&finished_parsing));
}
parsed_writer.Finish(&parsed_buffer_);
parsed_size_ = static_cast<int32_t>(parsed_buffer_->size());
parsed_ = parsed_buffer_->data();
DCHECK_EQ(values_size_, num_rows_ * num_cols_);
if (num_cols_ == -1) {
DCHECK_EQ(num_rows_, 0);
}
#ifndef NDEBUG
if (num_rows_ > 0) {
DCHECK_GT(values_buffers_.size(), 0);
auto& last_values_buffer = values_buffers_.back();
auto last_values = reinterpret_cast<const ValueDesc*>(last_values_buffer->data());
auto last_values_size = last_values_buffer->size() / sizeof(ValueDesc);
auto check_parsed_size =
static_cast<int32_t>(last_values[last_values_size - 1].offset);
DCHECK_EQ(parsed_size_, check_parsed_size);
} else {
DCHECK_EQ(parsed_size_, 0);
}
#endif
*out_size = static_cast<uint32_t>(data - start);
return Status::OK();
}
Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
uint32_t* out_size) {
if (options_.quoting) {
if (options_.escaping) {
return DoParseSpecialized<SpecializedOptions<true, true>>(start, size, is_final,
out_size);
} else {
return DoParseSpecialized<SpecializedOptions<true, false>>(start, size, is_final,
out_size);
}
} else {
if (options_.escaping) {
return DoParseSpecialized<SpecializedOptions<false, true>>(start, size, is_final,
out_size);
} else {
return DoParseSpecialized<SpecializedOptions<false, false>>(start, size, is_final,
out_size);
}
}
}
Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) {
return DoParse(data, size, false /* is_final */, out_size);
}
Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_size) {
return DoParse(data, size, true /* is_final */, out_size);
}
BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
int32_t max_num_rows)
: pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
: BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
} // namespace csv
} // namespace arrow