blob: 5acb63a4be6d05bf0cdafab80180c62b130b43e0 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <algorithm>
#include <climits>
#include <limits>
#include <memory>
#include <string>
#include <vector>
#include "cli/exit_codes.h"
#include "commands/commands.h"
#include "common/device_id.h"
#include "common/schema.h"
#include "format/result_set_format.h"
#include "reader/filter/tag_filter.h"
#include "reader/tsfile_reader.h"
namespace tsfile_cli {
namespace {
bool can_push_down_row_window(const ParsedArgs& args, long long offset,
long long limit) {
return !args.has_start && !args.has_end && offset <= INT_MAX &&
(limit < 0 || limit <= INT_MAX);
}
int to_reader_row_bound(long long value) {
return value < 0 ? -1 : static_cast<int>(value);
}
} // namespace
std::unique_ptr<storage::Filter> build_table_tag_filter(
const ParsedArgs& args, storage::TsFileReader& reader,
const std::string& table_name, std::ostream& err) {
if (!args.has_tag_filter) {
return std::unique_ptr<storage::Filter>();
}
auto schema = reader.get_table_schema(table_name);
if (!schema) {
err << "Error: no schema found for table " << table_name << "\n";
return std::unique_ptr<storage::Filter>();
}
storage::TagFilterBuilder builder(schema.get());
storage::Filter* filter = nullptr;
switch (args.tag_filter_op) {
case ParsedArgs::TagFilterOp::kEq:
filter = builder.eq(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kNeq:
filter = builder.neq(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kLt:
filter = builder.lt(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kLteq:
filter =
builder.lteq(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kGt:
filter = builder.gt(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kGteq:
filter =
builder.gteq(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kRegexp:
filter =
builder.reg_exp(args.tag_filter_column, args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kNotRegexp:
filter = builder.not_reg_exp(args.tag_filter_column,
args.tag_filter_value);
break;
case ParsedArgs::TagFilterOp::kBetween:
filter = builder.between_and(args.tag_filter_column,
args.tag_filter_value,
args.tag_filter_value2);
break;
case ParsedArgs::TagFilterOp::kNotBetween:
filter = builder.not_between_and(args.tag_filter_column,
args.tag_filter_value,
args.tag_filter_value2);
break;
case ParsedArgs::TagFilterOp::kNone:
break;
}
if (filter == nullptr) {
err << "Error: invalid tag filter column '" << args.tag_filter_column
<< "' for table " << table_name << "\n";
return std::unique_ptr<storage::Filter>();
}
return std::unique_ptr<storage::Filter>(filter);
}
std::vector<std::string> collect_tree_query_paths(
const ParsedArgs& args, storage::TsFileReader& reader) {
std::vector<std::string> paths;
const bool has_projection = !args.measurements.empty();
auto include_measurement = [&](const std::string& m) {
return !has_projection ||
std::find(args.measurements.begin(), args.measurements.end(),
m) != args.measurements.end();
};
if (!args.device.empty()) {
// A single device was requested: resolve its series and keep only the
// ones matching the projection. Filtering against the device's real
// schema means a provided measurement that doesn't exist on the device
// is dropped rather than queried blindly (matching the no-device path).
auto did = std::make_shared<storage::StringArrayDeviceID>(args.device);
std::vector<storage::MeasurementSchema> sch;
if (reader.get_timeseries_schema(did, sch) == 0) {
for (auto& m : sch) {
if (include_measurement(m.measurement_name_)) {
paths.push_back(args.device + "." + m.measurement_name_);
}
}
}
return paths;
}
// No device filter: collect every device/series from a single whole-file
// metadata call instead of querying each device one by one.
storage::DeviceTimeseriesMetadataMap meta =
reader.get_timeseries_metadata();
for (auto& kv : meta) {
if (!kv.first) {
continue;
}
const std::string dev = kv.first->get_device_name();
for (auto& ts : kv.second) {
if (!ts) {
continue;
}
const std::string m = ts->get_measurement_name().to_std_string();
if (include_measurement(m)) {
paths.push_back(dev + "." + m);
}
}
}
return paths;
}
int run_row_query(const ParsedArgs& args, storage::TsFileReader& reader,
OutputFormat fmt, std::ostream& out, std::ostream& err,
long long offset, long long limit) {
const int64_t start = args.has_start ? static_cast<int64_t>(args.start)
: std::numeric_limits<int64_t>::min();
const int64_t end = args.has_end ? static_cast<int64_t>(args.end)
: std::numeric_limits<int64_t>::max();
storage::ResultSet* rs = nullptr;
int qret = 0;
const bool push_down = can_push_down_row_window(args, offset, limit);
std::unique_ptr<storage::Filter> tag_filter;
if (is_table_model(args, reader)) {
std::string table_name = args.table;
if (table_name.empty()) {
auto schemas = reader.get_all_table_schemas();
if (schemas.empty() || !schemas[0]) {
err << "Error: no table found in file\n";
return kExitRuntime;
}
table_name = schemas[0]->get_table_name();
}
std::vector<std::string> cols = args.measurements;
if (cols.empty()) {
auto ts = reader.get_table_schema(table_name);
if (ts) {
cols = ts->get_measurement_names();
}
}
tag_filter = build_table_tag_filter(args, reader, table_name, err);
if (args.has_tag_filter && tag_filter == nullptr) {
return kExitUsage;
}
if (push_down) {
qret = reader.queryByRow(table_name, cols,
to_reader_row_bound(offset),
to_reader_row_bound(limit), rs,
tag_filter.get());
} else {
qret = reader.query(table_name, cols, start, end, rs,
tag_filter.get());
}
} else {
if (args.has_tag_filter) {
err << "Error: tag filter flags are only valid for table model\n";
return kExitUsage;
}
std::vector<std::string> paths = collect_tree_query_paths(args, reader);
if (paths.empty()) {
err << "Error: no time series found\n";
return kExitRuntime;
}
if (push_down) {
qret = reader.queryByRow(paths, to_reader_row_bound(offset),
to_reader_row_bound(limit), rs);
} else {
qret = reader.query(paths, start, end, rs);
}
}
if (qret != 0 || rs == nullptr) {
err << "Error: query failed: " << error_code_message(qret) << "\n";
if (rs != nullptr) {
reader.destroy_query_data_set(rs);
}
return kExitRuntime;
}
int wret = push_down
? emit_result_set(rs, fmt, args.no_header, out)
: emit_result_set(rs, fmt, args.no_header, out, offset,
limit);
reader.destroy_query_data_set(rs);
if (wret != 0) {
err << "Error: failed to read rows: " << error_code_message(wret)
<< "\n";
return kExitRuntime;
}
return kExitOk;
}
} // namespace tsfile_cli