blob: a394c18deb5934ba199daee945ff72792dbb54c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_TPCH_LINE_ITEM_TSV_IMPORTER_H
#define KUDU_TPCH_LINE_ITEM_TSV_IMPORTER_H
#include <fstream>
#include <vector>
#include <string>
#include "kudu/benchmarks/tpch/tpch-schemas.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/util/status.h"
namespace kudu {
static const char* const kPipeSeparator = "|";
// Utility class used to parse the lineitem tsv file
class LineItemTsvImporter {
public:
explicit LineItemTsvImporter(const std::string &path) : in_(path.c_str()),
updated_(false) {
CHECK(in_.is_open()) << "not able to open input file: " << path;
}
bool HasNextLine() {
if (!updated_) {
done_ = !getline(in_, line_);
updated_ = true;
}
return !done_;
}
// Fills the row builder with a single line item from the file.
// It returns 0 if it's done or the order number if it got a line
int GetNextLine(KuduPartialRow* row) {
if (!HasNextLine()) return 0;
columns_.clear();
// grab all the columns_ individually
// Note that columns_ refers, and does not copy, the data in line_
columns_ = strings::Split(line_, kPipeSeparator);
// The row copies all indirect data from columns_. This must be done
// because callers expect to retrieve lines repeatedly before flushing
// the accumulated rows in a batch.
int i = 0;
int order_number = ConvertToInt64AndPopulate(columns_[i++], row, tpch::kOrderKeyColIdx);
ConvertToIntAndPopulate(columns_[i++], row, tpch::kPartKeyColIdx);
ConvertToIntAndPopulate(columns_[i++], row, tpch::kSuppKeyColIdx);
ConvertToIntAndPopulate(columns_[i++], row, tpch::kLineNumberColIdx);
ConvertToIntAndPopulate(columns_[i++], row, tpch::kQuantityColIdx);
ConvertToDoubleAndPopulate(columns_[i++], row, tpch::kExtendedPriceColIdx);
ConvertToDoubleAndPopulate(columns_[i++], row, tpch::kDiscountColIdx);
ConvertToDoubleAndPopulate(columns_[i++], row, tpch::kTaxColIdx);
CHECK_OK(row->SetStringCopy(tpch::kReturnFlagColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kLineStatusColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kShipDateColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kCommitDateColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kReceiptDateColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kShipInstructColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kShipModeColIdx, columns_[i++]));
CHECK_OK(row->SetStringCopy(tpch::kCommentColIdx, columns_[i++]));
updated_ = false;
return order_number;
}
private:
int ConvertToInt64AndPopulate(const StringPiece &chars, KuduPartialRow* row,
int col_idx) {
// TODO: extra copy here, since we don't have a way to parse StringPiece
// into ints.
chars.CopyToString(&tmp_);
int64_t number;
bool ok_parse = safe_strto64(tmp_.c_str(), &number);
CHECK(ok_parse) << "Bad integer in column " << col_idx
<< ": '" << tmp_ << "'";
CHECK_OK(row->SetInt64(col_idx, number));
return number;
}
int ConvertToIntAndPopulate(const StringPiece &chars, KuduPartialRow* row,
int col_idx) {
// TODO: extra copy here, since we don't have a way to parse StringPiece
// into ints.
chars.CopyToString(&tmp_);
int number;
bool ok_parse = SimpleAtoi(tmp_.c_str(), &number);
CHECK(ok_parse) << "Bad integer in column " << col_idx
<< ": '" << tmp_ << "'";
CHECK_OK(row->SetInt32(col_idx, number));
return number;
}
void ConvertToDoubleAndPopulate(const StringPiece &chars, KuduPartialRow* row,
int col_idx) {
// TODO: extra copy here, since we don't have a way to parse StringPiece
// into ints.
chars.CopyToString(&tmp_);
char *error = NULL;
errno = 0;
const char *cstr = tmp_.c_str();
double number = strtod(cstr, &error);
CHECK(errno == 0 && // overflow/underflow happened
error != cstr) << "Bad double in column " << col_idx
<< ": '" << tmp_ << "': errno=" << errno;
CHECK_OK(row->SetDouble(col_idx, number));
}
std::ifstream in_;
std::vector<StringPiece> columns_;
std::string line_, tmp_;
bool updated_, done_;
};
} // namespace kudu
#endif