blob: 80cf953ee65592e9087991b595d603f0ee760359 [file] [log] [blame]
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#include "relational_operators/TextScanOperator.hpp"
#include <algorithm>
#include <cctype>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/TextScanOperator.pb.h"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Glob.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
using std::isxdigit;
using std::size_t;
using std::sscanf;
using std::string;
namespace quickstep {
DEFINE_uint64(textscan_split_blob_size, 2,
"Size of blobs in number of slots the input text files "
"are split into in the TextScanOperator.");
// Check if blob size is positive.
static bool ValidateTextScanSplitBlobSize(const char *flagname,
std::uint64_t blob_size) {
if (blob_size == 0) {
LOG(ERROR) << "--" << flagname << " must be greater than 0";
return false;
}
return true;
}
static const volatile bool text_scan_split_blob_size_dummy = gflags::RegisterFlagValidator(
&FLAGS_textscan_split_blob_size, &ValidateTextScanSplitBlobSize);
namespace {
// Detect whether '*search_string' contains a row-terminator (either line-feed
// or carriage-return + line-feed) immediately before 'end_pos'. If
// 'process_escape_sequences' is true, this function will also eliminate
// false-positives from an escaped row-terminator. Returns the number of
// characters in the row-terminator, or 0 if no terminator is detected.
inline unsigned DetectRowTerminator(const char *search_string,
std::size_t end_pos,
const bool process_escape_sequences) {
if (end_pos == 0) {
// Empty string.
return 0;
}
if (search_string[end_pos - 1] != '\n') {
// String doesn't end in newline.
return 0;
}
if (end_pos == 1) {
// String is the single newline character.
return 1;
}
const bool have_carriage_return = (search_string[end_pos - 2] == '\r');
if (have_carriage_return && (end_pos == 2)) {
// String is CR-LF and nothing else.
return 2;
}
std::size_t backslashes = 0;
// Count consecutive backslashes preceding the terminator. If there is an odd
// number of backslashes, then the terminator is escaped and doesn't count as
// a real terminator. If there is an even number of backslashes, then each
// pair is an escaped backslash literal and the terminator still counts.
if (process_escape_sequences) {
end_pos = end_pos - 2 - have_carriage_return;
while (end_pos != 0) {
if (search_string[end_pos] == '\\') {
++backslashes;
--end_pos;
if ((end_pos == 0) && (search_string[0] == '\\')) {
// Don't forget to count a backslash at the very beginning of a string.
++backslashes;
}
} else {
break;
}
}
}
if (backslashes & 0x1) {
return 0;
} else {
return 1 + have_carriage_return;
}
}
} // namespace
bool TextScanOperator::getAllWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
DCHECK(query_context != nullptr);
const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
if (files.size() == 0) {
LOG(FATAL) << "No files matched '" << file_pattern_ << "'. Exiting.";
}
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
if (parallelize_load_) {
// Parallel implementation: Split work orders are generated for each file
// being bulk-loaded. (More than one file can be loaded, because we support
// glob() semantics in file name.) These work orders read the input file,
// and split them in the blobs that can be parsed independently.
if (blocking_dependencies_met_) {
if (!work_generated_) {
// First, generate text-split work orders.
for (const auto &file : files) {
container->addNormalWorkOrder(
new TextSplitWorkOrder(file,
process_escape_sequences_,
storage_manager,
op_index_,
scheduler_client_id,
bus),
op_index_);
++num_split_work_orders_;
}
work_generated_ = true;
return false;
} else {
// Check if there are blobs to parse.
while (!text_blob_queue_.empty()) {
const TextBlob blob_work = text_blob_queue_.popOne();
container->addNormalWorkOrder(
new TextScanWorkOrder(blob_work.blob_id,
blob_work.size,
field_terminator_,
process_escape_sequences_,
output_destination,
storage_manager),
op_index_);
}
// Done if all split work orders are completed, and no blobs are left to
// process.
return num_done_split_work_orders_.load(std::memory_order_acquire) == num_split_work_orders_ &&
text_blob_queue_.empty();
}
}
return false;
} else {
// Serial implementation.
if (blocking_dependencies_met_ && !work_generated_) {
for (const auto &file : files) {
container->addNormalWorkOrder(
new TextScanWorkOrder(file,
field_terminator_,
process_escape_sequences_,
output_destination,
storage_manager),
op_index_);
}
work_generated_ = true;
}
return work_generated_;
}
}
void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
switch (msg.type()) {
case kSplitWorkOrderCompletionMessage: {
num_done_split_work_orders_.fetch_add(1, std::memory_order_release);
break;
}
case kNewTextBlobMessage: {
serialization::TextBlob proto;
CHECK(proto.ParseFromArray(msg.payload(), msg.payload_size()));
text_blob_queue_.push(TextBlob(proto.blob_id(), proto.size()));
break;
}
default:
LOG(ERROR) << "Unknown feedback message type for TextScanOperator";
}
}
TextScanWorkOrder::TextScanWorkOrder(const std::string &filename,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
StorageManager *storage_manager)
: is_file_(true),
filename_(filename),
field_terminator_(field_terminator),
text_blob_(0),
text_size_(0),
process_escape_sequences_(process_escape_sequences),
output_destination_(output_destination),
storage_manager_(storage_manager) {
DCHECK(output_destination_ != nullptr);
DCHECK(storage_manager_ != nullptr);
}
TextScanWorkOrder::TextScanWorkOrder(const block_id text_blob,
const std::size_t text_size,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
StorageManager *storage_manager)
: is_file_(false),
field_terminator_(field_terminator),
text_blob_(text_blob),
text_size_(text_size),
process_escape_sequences_(process_escape_sequences),
output_destination_(output_destination),
storage_manager_(storage_manager) {
DCHECK(output_destination_ != nullptr);
DCHECK(storage_manager_ != nullptr);
}
void TextScanWorkOrder::execute() {
const CatalogRelationSchema &relation = output_destination_->getRelation();
string current_row_string;
if (is_file_) {
FILE *file = std::fopen(filename_.c_str(), "r");
if (file == nullptr) {
throw TextScanReadError(filename_);
}
bool have_row = false;
do {
current_row_string.clear();
have_row = readRowFromFile(file, &current_row_string);
if (have_row) {
Tuple tuple = parseRow(current_row_string, relation);
output_destination_->insertTupleInBatch(tuple);
}
} while (have_row);
std::fclose(file);
} else {
BlobReference blob = storage_manager_->getBlob(text_blob_);
const char *blob_pos = static_cast<const char*>(blob->getMemory());
const char *blob_end = blob_pos + text_size_;
bool have_row = false;
do {
current_row_string.clear();
have_row = readRowFromBlob(&blob_pos, blob_end, &current_row_string);
if (have_row) {
Tuple tuple = parseRow(current_row_string, relation);
output_destination_->insertTupleInBatch(tuple);
}
} while (have_row);
}
}
char TextScanWorkOrder::ParseOctalLiteral(const std::string &row_string,
std::size_t *start_pos) {
const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 3);
int value = 0;
for (; *start_pos < stop_pos; ++*start_pos) {
int char_value = row_string[*start_pos] - '0';
if ((char_value >= 0) && (char_value < 8)) {
value = value * 8 + char_value;
} else {
return value;
}
}
return value;
}
char TextScanWorkOrder::ParseHexLiteral(const std::string &row_string,
std::size_t *start_pos) {
const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 2);
int value = 0;
for (; *start_pos < stop_pos; ++*start_pos) {
if (!std::isxdigit(row_string[*start_pos])) {
break;
}
int char_value;
if (std::isdigit(row_string[*start_pos])) {
char_value = row_string[*start_pos] - '0';
} else if (std::islower(row_string[*start_pos])) {
char_value = row_string[*start_pos] - 'a' + 10;
} else {
char_value = row_string[*start_pos] - 'A' + 10;
}
value = value * 16 + char_value;
}
return value;
}
bool TextScanWorkOrder::readRowFromFile(FILE *file, std::string *row_string) const {
// Read up to 1023 chars + null-terminator at a time.
static constexpr std::size_t kRowBufferSize = 1024;
char row_buffer[kRowBufferSize];
for (;;) {
char *read_string = std::fgets(row_buffer, sizeof(row_buffer), file);
if (read_string == nullptr) {
if (std::feof(file)) {
if (row_string->empty()) {
return false;
} else {
throw TextScanFormatError("File ended without delimiter");
}
} else {
throw TextScanReadError(filename_);
}
}
// Append the contents of the buffer to '*row_string', and see if we've
// reached a genuine row-terminator yet.
row_string->append(row_buffer);
if (removeRowTerminator(row_string)) {
row_string->push_back(field_terminator_);
return true;
}
}
}
bool TextScanWorkOrder::readRowFromBlob(const char **start_pos,
const char *end_pos,
std::string *row_string) const {
while (*start_pos != end_pos) {
const char *next_newline = static_cast<const char*>(std::memchr(
*start_pos,
'\n',
end_pos - *start_pos));
if (next_newline == nullptr) {
throw TextScanFormatError("File ended without delimiter");
}
// Append the blob's contents through the next newline to '*row_string',
// and see if we've reached a genuine row-terminator yet.
row_string->append(*start_pos, next_newline - *start_pos + 1);
*start_pos = next_newline + 1;
if (removeRowTerminator(row_string)) {
row_string->push_back(field_terminator_);
return true;
}
}
if (row_string->empty()) {
return false;
} else {
throw TextScanFormatError("File ended without delimiter");
}
}
bool TextScanWorkOrder::removeRowTerminator(std::string *row_string) const {
unsigned row_term_chars = DetectRowTerminator(row_string->c_str(),
row_string->length(),
process_escape_sequences_);
if (row_term_chars == 0) {
return false;
} else {
row_string->resize(row_string->length() - row_term_chars);
return true;
}
}
bool TextScanWorkOrder::extractFieldString(const std::string &row_string,
std::size_t *start_pos,
std::string *field_string) const {
// Check for NULL literal string.
if (process_escape_sequences_
&& (row_string.length() - *start_pos >= 3)
&& (row_string[*start_pos] == '\\')
&& (row_string[*start_pos + 1] == 'N')
&& (row_string[*start_pos + 2] == field_terminator_)) {
*start_pos += 3;
return false;
}
// Scan up until terminator, expanding backslashed escape sequences as we go.
std::size_t terminator_pos = row_string.find(field_terminator_, *start_pos);
std::size_t scan_pos = *start_pos;
if (process_escape_sequences_) {
for (;;) {
std::size_t backslash_pos = row_string.find('\\', scan_pos);
if ((backslash_pos == std::string::npos) || (backslash_pos >= terminator_pos)) {
// No more backslashes, or the next backslash is beyond the field
// terminator.
break;
}
// Copy up to the backslash.
field_string->append(row_string, scan_pos, backslash_pos - scan_pos);
if (backslash_pos + 1 == terminator_pos) {
// The terminator we found was escaped by a backslash, so append the
// literal terminator and re-scan for the next terminator character.
field_string->push_back(field_terminator_);
scan_pos = terminator_pos + 1;
terminator_pos = row_string.find(field_terminator_, scan_pos);
continue;
}
// Expand escape sequence.
switch (row_string[backslash_pos + 1]) {
case '0': // Fallthrough for octal digits.
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
// Octal char literal.
scan_pos = backslash_pos + 1;
field_string->push_back(ParseOctalLiteral(row_string, &scan_pos));
break;
case 'N': {
// Null literal after some other column data.
throw TextScanFormatError(
"Null indicator '\\N' encountered in text scan mixed in with "
"other column data.");
}
case '\\':
// Backslash.
field_string->push_back('\\');
scan_pos = backslash_pos + 2;
break;
case 'b':
// Backspace.
field_string->push_back('\b');
scan_pos = backslash_pos + 2;
break;
case 'f':
// Form-feed.
field_string->push_back('\f');
scan_pos = backslash_pos + 2;
break;
case 'n':
// Newline.
field_string->push_back('\n');
scan_pos = backslash_pos + 2;
break;
case 'r':
// Carriage return.
field_string->push_back('\r');
scan_pos = backslash_pos + 2;
break;
case 't':
// Tab.
field_string->push_back('\t');
scan_pos = backslash_pos + 2;
break;
case 'v':
// Vertical tab.
field_string->push_back('\v');
scan_pos = backslash_pos + 2;
break;
case 'x':
if ((backslash_pos + 2 < row_string.length()) && std::isxdigit(row_string[backslash_pos + 2])) {
// Hexidecimal char literal.
scan_pos = backslash_pos + 2;
field_string->push_back(ParseHexLiteral(row_string, &scan_pos));
} else {
// Just an escaped 'x' with no hex digits.
field_string->push_back('x');
scan_pos = backslash_pos + 2;
}
break;
default:
// Append escaped character as-is.
field_string->push_back(row_string[backslash_pos + 1]);
scan_pos = backslash_pos + 2;
break;
}
}
}
DCHECK_NE(terminator_pos, std::string::npos);
field_string->append(row_string, scan_pos, terminator_pos - scan_pos);
*start_pos = terminator_pos + 1;
return true;
}
Tuple TextScanWorkOrder::parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const {
std::vector<TypedValue> attribute_values;
std::size_t pos = 0;
std::string value_str;
CatalogRelationSchema::const_iterator attr_it = relation.begin();
while (pos < row_string.length()) {
if (attr_it == relation.end()) {
throw TextScanFormatError("Row has too many fields");
}
value_str.clear();
if (extractFieldString(row_string, &pos, &value_str)) {
attribute_values.emplace_back();
if (!attr_it->getType().parseValueFromString(value_str, &(attribute_values.back()))) {
throw TextScanFormatError("Failed to parse value");
}
} else {
// NULL literal.
if (!attr_it->getType().isNullable()) {
throw TextScanFormatError(
"NULL literal '\\N' was specified for a column with a "
"non-nullable Type");
}
attribute_values.emplace_back(attr_it->getType().makeNullValue());
}
++attr_it;
}
if (attr_it != relation.end()) {
throw TextScanFormatError("Row has too few fields");
}
return Tuple(std::move(attribute_values));
}
void TextSplitWorkOrder::execute() {
std::FILE *file = std::fopen(filename_.c_str(), "r");
if (!file) {
throw TextScanReadError(filename_);
}
bool eof = false;
do {
// Allocate new blob, if current is empty.
if (0 == remainingBlobBytes()) {
allocateBlob();
}
// Read the into the unwritten part of blob.
std::size_t bytes =
std::fread(writeableBlobAddress(), 1, remainingBlobBytes(), file);
eof = bytes < remainingBlobBytes();
written_ += bytes;
// Write the current blob to queue for processing.
sendBlobInfoToOperator(!eof /* write_row_aligned */);
} while (!eof);
std::fclose(file);
// Notify the operator about the completion of this Work Order.
FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
operator_index_,
nullptr /* payload */,
0 /* payload_size */,
false /* ownership */);
SendFeedbackMessage(bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
}
// Allocate new blob.
void TextSplitWorkOrder::allocateBlob() {
text_blob_id_ = storage_manager_->createBlob(FLAGS_textscan_split_blob_size);
text_blob_ = storage_manager_->getBlobMutable(text_blob_id_);
blob_size_ = text_blob_->size();
written_ = 0;
}
// Find the last row terminator in the blob.
std::size_t TextSplitWorkOrder::findLastRowTerminator() {
std::size_t found = 0;
const char *blob = static_cast<const char *>(text_blob_->getMemory());
for (std::size_t index = written_;
index != 0;
--index) {
if (DetectRowTerminator(blob, index, process_escape_sequences_)) {
found = index;
break;
}
}
// TODO(quickstep-team): Design a way to handle long rows that are larger than
// the configured blob size.
CHECK_NE(0u, found) << "No row terminator found in " << FLAGS_textscan_split_blob_size
<< "-slot chunk of " << filename_;
return found;
}
void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) {
std::size_t text_len = written_;
std::string residue;
if (write_row_aligned) {
// Find last row terminator in current blob.
text_len = findLastRowTerminator();
// Copy the residual bytes after the last row terminator.
residue = std::string(
static_cast<char *>(text_blob_->getMemoryMutable()) + text_len,
written_ - text_len);
}
// Notify the operator for the split-up blob.
serialization::TextBlob proto;
proto.set_blob_id(text_blob_id_);
proto.set_size(text_len);
const std::size_t payload_size = proto.ByteSize();
// NOTE(zuyu): 'payload' gets released by FeedbackMessage's destructor.
char *payload = static_cast<char *>(std::malloc(payload_size));
CHECK(proto.SerializeToArray(payload, payload_size));
const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
operator_index_,
payload,
payload_size);
SendFeedbackMessage(bus_, worker_thread_client_id, scheduler_client_id_, feedback_msg);
// Notify Foreman for the avaiable work order on the blob.
serialization::WorkOrdersAvailableMessage message_proto;
message_proto.set_operator_index(operator_index_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const size_t message_proto_length = message_proto.ByteSize();
char *message_proto_bytes = static_cast<char*>(std::malloc(message_proto_length));
CHECK(message_proto.SerializeToArray(message_proto_bytes, message_proto_length));
tmb::TaggedMessage tagged_message(static_cast<const void *>(message_proto_bytes),
message_proto_length,
kWorkOrdersAvailableMessage);
std::free(message_proto_bytes);
// Send new work order available message to Foreman.
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(
bus_,
worker_thread_client_id,
scheduler_client_id_,
std::move(tagged_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
"be sent from thread with TMB client ID "
<< worker_thread_client_id << " to Foreman with TMB client "
"ID " << scheduler_client_id_;
if (residue.size()) {
// Allocate new blob, and copy residual bytes from last blob.
allocateBlob();
std::memcpy(writeableBlobAddress(), residue.data(), residue.size());
written_ += residue.size();
}
}
} // namespace quickstep