// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This utility will first try to load the data from the given path if the
// tablet doesn't already exist at the given location. It will then run
// the tpch1 query, as described below, up to tpch_num_query_iterations times.
// The input data must be in the tpch format, separated by "|".
// Usage:
// tpch1 -tpch_path_to_data=/home/jdcryans/lineitem.tbl
// -tpch_num_query_iterations=1
// -tpch_expected_matching_rows=12345
// From Impala:
// ====
// ---- QUERY : TPCH-Q1
// # Q1 - Pricing Summary Report Query
// # Modifications: Remove ORDER BY
// select
// l_returnflag,
// l_linestatus,
// (sum(l_quantity), 1),
// (sum(l_extendedprice), 1),
// (sum(l_extendedprice * (1 - l_discount)), 1),
// (sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)), 1),
// (avg(l_quantity), 1),
// (avg(l_extendedprice), 1),
// (avg(l_discount), 1), count(1)
// from
// lineitem
// where
// l_shipdate<='1998-09-02'
// group by
// l_returnflag,
// l_linestatus
// ---- TYPES
// string, string, double, double, double, double, double, double, double, bigint
// ---- RESULTS
// 'A','F',37734107,56586554400.73,53758257134.9,55909065222.8,25.5,38273.1,0,1478493
// 'N','F',991417,1487504710.38,1413082168.1,1469649223.2,25.5,38284.5,0.1,38854
// 'N','O',74476040,111701729697.74,106118230307.6,110367043872.5,25.5,38249.1,0,2920374
// 'R','F',37719753,56568041380.90,53741292684.6,55889619119.8,25.5,38250.9,0.1,1478870
// ====
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/benchmarks/tpch/line_item_tsv_importer.h"
#include "kudu/benchmarks/tpch/rpc_line_item_dao.h"
#include "kudu/client/row_result.h"
#include "kudu/codegen/compilation_manager.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/flags.h"
#include "kudu/util/logging.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
namespace kudu {
class KuduPartialRow;
} // namespace kudu
DEFINE_string(tpch_path_to_data, "/tmp/lineitem.tbl",
"The full path to the '|' separated file containing the lineitem table.");
DEFINE_int32(tpch_num_query_iterations, 1, "Number of times the query will be run.");
DEFINE_int32(tpch_expected_matching_rows, 5916591, "Number of rows that should match the query.");
DEFINE_bool(tpch_check_matching_rows, true, "Whether to check the number of matching rows.");
DEFINE_bool(use_mini_cluster, true,
"Create a mini cluster for the work to be performed against.");
DEFINE_string(mini_cluster_base_dir, "/tmp/tpch",
"If using a mini cluster, directory for master/ts data.");
DEFINE_string(master_address, "localhost",
"Address of master for the cluster to operate on");
DEFINE_int32(tpch_max_batch_size, 1000,
"Maximum number of inserts/updates to batch at once. Set to 0 "
"to delegate the batching control to the logic of the "
"KuduSession running in AUTO_BACKGROUND_MODE flush mode.");
using kudu::client::KuduRowResult;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
namespace kudu {
struct Result {
int32_t l_quantity;
double l_extendedprice;
double l_discount;
double l_tax;
int count;
: l_quantity(0), l_extendedprice(0), l_discount(0), l_tax(0), count(0) {
// This struct is used for the keys while running the GROUP BY instead of manipulating strings
struct SliceMapKey {
Slice slice;
// This copies the string out of the result buffer
void RelocateSlice() {
auto* buf = new uint8_t[slice.size()];
bool operator==(const SliceMapKey &other_key) const {
return slice == other_key.slice;
struct Hash {
size_t operator()(const SliceMapKey &key) const {
return util_hash::CityHash64(
reinterpret_cast<const char *>(, key.slice.size());
void LoadLineItems(const string &path, RpcLineItemDAO *dao) {
LineItemTsvImporter importer(path);
auto f = [&importer](KuduPartialRow* row) { importer.GetNextLine(row); };
while (importer.HasNextLine()) {
void WarmupScanCache(RpcLineItemDAO* dao) {
// Warms up cache for the tpch1 query.
unique_ptr<RpcLineItemDAO::Scanner> scanner;
void Tpch1(RpcLineItemDAO *dao) {
typedef unordered_map<SliceMapKey, Result*, Hash> slice_map;
typedef unordered_map<SliceMapKey, slice_map*, Hash> slice_map_map;
unique_ptr<RpcLineItemDAO::Scanner> scanner;
int matching_rows = 0;
slice_map_map results;
Result *r;
vector<KuduRowResult> rows;
while (scanner->HasMore()) {
for (const KuduRowResult& row : rows) {
SliceMapKey l_returnflag;
CHECK_OK(row.GetString(1, &l_returnflag.slice));
SliceMapKey l_linestatus;
CHECK_OK(row.GetString(2, &l_linestatus.slice));
int32_t l_quantity;
CHECK_OK(row.GetInt32(3, &l_quantity));
double l_extendedprice;
CHECK_OK(row.GetDouble(4, &l_extendedprice));
double l_discount;
CHECK_OK(row.GetDouble(5, &l_discount));
double l_tax;
CHECK_OK(row.GetDouble(6, &l_tax));
slice_map *linestatus_map;
auto it = results.find(l_returnflag);
if (it == results.end()) {
linestatus_map = new slice_map;
results[l_returnflag] = linestatus_map;
} else {
linestatus_map = it->second;
auto inner_it = linestatus_map->find(l_linestatus);
if (inner_it == linestatus_map->end()) {
r = new Result();
(*linestatus_map)[l_linestatus] = r;
} else {
r = inner_it->second;
r->l_quantity += l_quantity;
r->l_extendedprice += l_extendedprice;
r->l_discount += l_discount;
r->l_tax += l_tax;
LOG(INFO) << "Result: ";
for (const auto& result : results) {
const SliceMapKey returnflag = result.first;
const auto* maps = result.second;
for (const auto& map : *maps) {
const SliceMapKey linestatus = map.first;
Result* r = map.second;
double avg_q = static_cast<double>(r->l_quantity) / r->count;
double avg_ext_p = r->l_extendedprice / r->count;
double avg_discount = r->l_discount / r->count;
LOG(INFO) << returnflag.slice.ToString() << ", " <<
linestatus.slice.ToString() << ", " <<
r->l_quantity << ", " <<
StringPrintf("%.2f", r->l_extendedprice) << ", " <<
// TODO those two are missing at the moment, might want to change Result
// sum(l_extendedprice * (1 - l_discount))
// sum(l_extendedprice * (1 - l_discount) * (1 + l_tax))
StringPrintf("%.2f", avg_q) << ", " <<
StringPrintf("%.2f", avg_ext_p) << ", " <<
StringPrintf("%.2f", avg_discount) << ", " <<
delete r;
delete maps;
if (FLAGS_tpch_check_matching_rows) {
CHECK_EQ(matching_rows, FLAGS_tpch_expected_matching_rows) << "Wrong number of rows returned";
} // namespace kudu
int main(int argc, char **argv) {
kudu::ParseCommandLineFlags(&argc, &argv, true);
kudu::Env* env;
unique_ptr<kudu::cluster::InternalMiniCluster> cluster;
string master_address;
if (FLAGS_use_mini_cluster) {
env = kudu::Env::Default();
kudu::Status s = env->CreateDir(FLAGS_mini_cluster_base_dir);
CHECK(s.IsAlreadyPresent() || s.ok()) << s.ToString();
kudu::cluster::InternalMiniClusterOptions options;
options.cluster_root = FLAGS_mini_cluster_base_dir;
cluster.reset(new kudu::cluster::InternalMiniCluster(env, options));
master_address = cluster->mini_master()->bound_rpc_addr_str();
} else {
master_address = FLAGS_master_address;
string tpch_table_name = FLAGS_table_name.empty() ? "lineitem" : FLAGS_table_name;
unique_ptr<kudu::RpcLineItemDAO> dao(new kudu::RpcLineItemDAO(
master_address, std::move(tpch_table_name), FLAGS_tpch_max_batch_size,
/* timeout_ms = */ 5000, kudu::RpcLineItemDAO::RANGE,
/* num_buckets = */ 1));
bool needs_loading = dao->IsTableEmpty();
if (needs_loading) {
LOG_TIMING(INFO, "loading") {
kudu::LoadLineItems(FLAGS_tpch_path_to_data, dao.get());
} else {
LOG(INFO) << "Data already in place";
for (int i = 0; i < FLAGS_tpch_num_query_iterations; i++) {
LOG_TIMING(INFO, StringPrintf("querying for iteration # %d", i)) {
if (cluster) {
return 0;