blob: f84d485b4a59c67651221b831c574966e7ff0860 [file] [log] [blame]
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <string>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
#include "utility/ThreadSafeQueue.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
class WorkOrdersContainer;
/** \addtogroup RelationalOperators
* @{
*/
/**
* @brief Exception thrown when a text file can't be opened for reading.
**/
class TextScanReadError : public std::exception {
public:
explicit TextScanReadError(const std::string &filename)
: message_("TextScanReadError: Unable to read file ") {
message_.append(filename);
message_.append(" for text scan");
}
~TextScanReadError() throw() {}
virtual const char* what() const throw() {
return message_.c_str();
}
private:
std::string message_;
};
/**
* @brief Exception thrown when improperly formatted data is encountered while
* scanning a text file.
**/
class TextScanFormatError : public std::exception {
public:
explicit TextScanFormatError(const std::string reason)
: message_("TextScanFormatError: Malformed data encountered during text scan (") {
message_.append(reason);
message_.push_back(')');
}
~TextScanFormatError() throw() {}
virtual const char* what() const throw() {
return message_.c_str();
}
private:
std::string message_;
};
/**
* @brief A structure for text data blobs.
*/
struct TextBlob {
TextBlob(const block_id text_blob_id, const std::size_t text_size)
: blob_id(text_blob_id), size(text_size) {}
block_id blob_id;
std::size_t size;
};
/**
* @brief An operator which reads tuples from a text file and inserts them into
* a relation.
**/
class TextScanOperator : public RelationalOperator {
public:
enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
kNewTextBlobMessage,
kSplitWorkOrderCompletionMessage,
};
/**
* @brief Constructor
*
* @param file_pattern The glob-like file pattern of the sources to load. The
* pattern could include * (wildcard for multiple chars) and ?
* (wildcard for single char). It defaults to single file load, if a
* file is specified.
* @param field_terminator The string which separates attribute values in
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param parallelize_load Parallelize the load process by th spliting file
* into blobs, and generating separate work-orders for each of them.
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert tuples.
**/
TextScanOperator(const std::string &file_pattern,
const char field_terminator,
const bool process_escape_sequences,
const bool parallelize_load,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index)
: file_pattern_(file_pattern),
field_terminator_(field_terminator),
process_escape_sequences_(process_escape_sequences),
parallelize_load_(parallelize_load),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
num_done_split_work_orders_(0),
num_split_work_orders_(0),
work_generated_(false) {}
~TextScanOperator() override {}
bool getAllWorkOrders(WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id foreman_client_id,
const tmb::client_id agent_client_id,
tmb::MessageBus *bus) override;
QueryContext::insert_destination_id getInsertDestinationID() const override {
return output_destination_index_;
}
const relation_id getOutputRelationID() const override {
return output_relation_.getID();
}
void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;
private:
const std::string file_pattern_;
const char field_terminator_;
const bool process_escape_sequences_;
const bool parallelize_load_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
ThreadSafeQueue<TextBlob> text_blob_queue_;
std::atomic<std::uint32_t> num_done_split_work_orders_;
std::uint32_t num_split_work_orders_;
// Indicates if work order to load file is generated for non-parallel load, and
// if work order to split file to blobs is generated for parallel load.
bool work_generated_;
DISALLOW_COPY_AND_ASSIGN(TextScanOperator);
};
/**
* @brief A WorkOrder produced by TextScanOperator
**/
class TextScanWorkOrder : public WorkOrder {
public:
/**
* @brief Constructor
*
* @param filename The name of the text file to bulk insert.
* @param field_terminator The string which separates attribute values in
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param output_destination The InsertDestination to insert tuples.
* @param storage_manager The StorageManager to use.
**/
TextScanWorkOrder(
const std::string &filename,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
StorageManager *storage_manager);
/**
* @brief Constructor.
*
* @param text_blob Blob ID containing the data to be scanned.
* @param text_size Size of the data in the blob.
* @param field_terminator The character which separates attribute values in
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param output_destination The InsertDestination to write the read tuples.
* @param storage_manager The StorageManager to use.
*/
TextScanWorkOrder(
const block_id text_blob,
const std::size_t text_size,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
StorageManager *storage_manager);
~TextScanWorkOrder() override {}
// FIXME(chasseur): Rollback if an exception is thrown after some tuples have
// already been inserted.
/**
* @exception TextScanReadError The text file could not be opened for
* reading.
* @exception TextScanFormatError Malformed data was encountered in the
* text file.
* @exception TupleTooLargeForBlock A tuple in the text file was too large
* to fit in a StorageBlock.
**/
void execute() override;
private:
// Parse up to three octal digits (0-7) starting at '*start_pos' in
// 'row_string' as a char literal. '*start_pos' will be modified to
// the first position AFTER the parsed octal digits.
static char ParseOctalLiteral(const std::string &row_string,
std::size_t *start_pos);
// Parse up to two hexadecimal digits (0-F, case insensitive) starting at
// '*start_pos' in 'row_string' as a char literal. '*start_pos' will be
// modified to the first position AFTER the parsed hexadecimal digits.
static char ParseHexLiteral(const std::string &row_string,
std::size_t *start_pos);
// Read the next text row from the open FILE stream '*file' into
// '*row_string'. Returns false if end-of-file is reached and there are no
// more rows, true if a row string was successfully read. For ease of
// parsing, '*row_string' has the trailing row-terminator removed and
// replaced with a field-terminator.
bool readRowFromFile(FILE *file, std::string *row_string) const;
// Read the next text from blob memory starting at '**start_pos' and ending
// at '*end_pos' into '*row_string'. Returns false if the end of the blob is
// reached and there are no more rows, true if a row was successfully read.
// For ease of parsing, '*row_string' has the trailing row-terminator removed
// and replaced with a field-terminator. After call '*start_pos' points to
// first character AFTER the read row in the blob.
bool readRowFromBlob(const char **start_pos,
const char *end_pos,
std::string *row_string) const;
// Trim a row-terminator (newline or carriage-return + newline) off the end
// of '*row_string'. Returns true if the row-terminator was successfully
// removed, false if '*row_string' did not end in a row-terminator.
bool removeRowTerminator(std::string *row_string) const;
// Extract a field string starting at '*start_pos' in 'row_string' into
// '*field_string'. This method also expands escape sequences if
// 'process_escape_sequences_' is true. Returns true if a field string was
// successfully extracted, false in the special case where the NULL-literal
// string "\N" was found. Throws TextScanFormatError if text was malformed.
bool extractFieldString(const std::string &row_string,
std::size_t *start_pos,
std::string *field_string) const;
// Make a tuple by parsing all of the individual fields specified in
// 'row_string'.
Tuple parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const;
const bool is_file_;
const std::string filename_;
const char field_terminator_;
const block_id text_blob_;
const std::size_t text_size_;
const bool process_escape_sequences_;
InsertDestination *output_destination_;
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};
/**
* @brief A WorkOrder to split the file into blobs of text that can be processed
* separately.
**/
class TextSplitWorkOrder : public WorkOrder {
public:
/**
* @brief Constructor.
* @param filename File to split into row-aligned blobs.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param storage_manager The StorageManager to use.
* @param operator_index Operator index of the current operator. This is used
* to send new-work available message to Foreman.
* @param foreman_client_id The TMB client ID of the foreman thread.
* @param agent_client_id The TMB client ID of the agent that sends messages
* to Foreman.
* @param bus A pointer to the TMB.
*/
TextSplitWorkOrder(const std::string filename,
const bool process_escape_sequences,
StorageManager *storage_manager,
const std::size_t operator_index,
const tmb::client_id foreman_client_id,
const tmb::client_id agent_client_id,
MessageBus *bus)
: filename_(filename),
process_escape_sequences_(process_escape_sequences),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
operator_index_(operator_index),
foreman_client_id_(foreman_client_id),
agent_client_id_(agent_client_id),
bus_(DCHECK_NOTNULL(bus)) {}
/**
* @exception TextScanReadError The text file could not be opened for
* reading.
*/
void execute() override;
private:
// Allocate a new blob.
void allocateBlob();
// Find the last row terminator in current blob.
std::size_t findLastRowTerminator();
// Send the blob info to its operator via TMB.
void sendBlobInfoToOperator(const bool write_row_aligned);
// Get the writeable address (unwritten chunk) in current blob.
inline char* writeableBlobAddress() {
return static_cast<char*>(text_blob_->getMemoryMutable()) + written_;
}
// Number of bytes remaining to be written.
inline std::size_t remainingBlobBytes() const {
return blob_size_ - written_;
}
const std::string filename_; // File to split.
const bool process_escape_sequences_;
StorageManager *storage_manager_;
const std::size_t operator_index_; // Opeartor index.
const tmb::client_id foreman_client_id_, agent_client_id_;
MessageBus *bus_;
MutableBlobReference text_blob_; // Mutable reference to current blob.
block_id text_blob_id_; // Current blob ID.
std::size_t written_ = 0; // Bytes written in current blob.
std::size_t blob_size_ = 0; // Size of the current blob.
DISALLOW_COPY_AND_ASSIGN(TextSplitWorkOrder);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_