blob: 7ef042ed431a966f1a46d210ef15f1881cfdeb11 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#![allow(bare_trait_objects)]
use arrow::array::*;
use arrow::datatypes::{DataType, TimeUnit};
use clap::{crate_version, App, Arg};
use datafusion::error::{ExecutionError, Result};
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::relation::Relation;
use prettytable::{Cell, Row, Table};
use rustyline::Editor;
use std::cell::RefMut;
use std::env;
use std::path::Path;
pub fn main() {
let matches = App::new("DataFusion")
.version(crate_version!())
.about(
"DataFusion is an in-memory query engine that uses Apache Arrow \
as the memory model. It supports executing SQL queries against CSV and \
Parquet files as well as querying directly against in-memory data.",
)
.arg(
Arg::with_name("data-path")
.help("Path to your data, default to current directory")
.short("p")
.long("data-path")
.takes_value(true),
)
.arg(
Arg::with_name("batch-size")
.help("The batch size of each query, default value is 1048576")
.short("c")
.long("batch-size")
.takes_value(true),
)
.get_matches();
if let Some(path) = matches.value_of("data-path") {
let p = Path::new(path);
env::set_current_dir(&p).unwrap();
};
let batch_size = matches
.value_of("batch-size")
.map(|size| size.parse::<usize>().unwrap())
.unwrap_or(1_048_576);
let mut ctx = ExecutionContext::new();
let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
let mut query = "".to_owned();
loop {
let readline = rl.readline("> ");
match readline {
Ok(ref line) if line.trim_end().ends_with(';') => {
query.push_str(line.trim_end());
rl.add_history_entry(query.clone());
match exec_and_print(&mut ctx, query, batch_size) {
Ok(_) => {}
Err(err) => println!("{:?}", err),
}
query = "".to_owned();
}
Ok(ref line) => {
query.push_str(line);
query.push_str(" ");
}
Err(_) => {
break;
}
}
}
rl.save_history(".history").ok();
}
fn exec_and_print(
ctx: &mut ExecutionContext,
sql: String,
batch_size: usize,
) -> Result<()> {
let relation = ctx.sql(&sql, batch_size)?;
print_result(relation.borrow_mut())?;
Ok(())
}
fn print_result(mut results: RefMut<Relation>) -> Result<()> {
let mut row_count = 0;
let mut table = Table::new();
let schema = results.schema();
let mut header = Vec::new();
for field in schema.fields() {
header.push(Cell::new(&field.name()));
}
table.add_row(Row::new(header));
while let Some(batch) = results.next().unwrap() {
row_count += batch.num_rows();
for row in 0..batch.num_rows() {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(Cell::new(&str_value(column.clone(), row)?));
}
table.add_row(Row::new(cells));
}
}
table.printstd();
if row_count > 1 {
println!("{} rows in set.", row_count);
} else {
println!("{} row in set.", row_count);
}
Ok(())
}
macro_rules! make_string {
($array_type:ty, $column: ident, $row: ident) => {{
Ok($column
.as_any()
.downcast_ref::<$array_type>()
.unwrap()
.value($row)
.to_string())
}};
}
fn str_value(column: ArrayRef, row: usize) -> Result<String> {
match column.data_type() {
DataType::Utf8 => Ok(column
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.get_string(row)),
DataType::Boolean => make_string!(BooleanArray, column, row),
DataType::Int16 => make_string!(Int16Array, column, row),
DataType::Int32 => make_string!(Int32Array, column, row),
DataType::Int64 => make_string!(Int64Array, column, row),
DataType::UInt8 => make_string!(UInt8Array, column, row),
DataType::UInt16 => make_string!(UInt16Array, column, row),
DataType::UInt32 => make_string!(UInt32Array, column, row),
DataType::UInt64 => make_string!(UInt64Array, column, row),
DataType::Float16 => make_string!(Float32Array, column, row),
DataType::Float32 => make_string!(Float32Array, column, row),
DataType::Float64 => make_string!(Float64Array, column, row),
DataType::Timestamp(unit) if *unit == TimeUnit::Second => {
make_string!(TimestampSecondArray, column, row)
}
DataType::Timestamp(unit) if *unit == TimeUnit::Millisecond => {
make_string!(TimestampMillisecondArray, column, row)
}
DataType::Timestamp(unit) if *unit == TimeUnit::Microsecond => {
make_string!(TimestampMicrosecondArray, column, row)
}
DataType::Timestamp(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(TimestampNanosecondArray, column, row)
}
DataType::Date32(_) => make_string!(Date32Array, column, row),
DataType::Date64(_) => make_string!(Date64Array, column, row),
DataType::Time32(unit) if *unit == TimeUnit::Second => {
make_string!(Time32SecondArray, column, row)
}
DataType::Time32(unit) if *unit == TimeUnit::Millisecond => {
make_string!(Time32MillisecondArray, column, row)
}
DataType::Time32(unit) if *unit == TimeUnit::Microsecond => {
make_string!(Time64MicrosecondArray, column, row)
}
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(Time64NanosecondArray, column, row)
}
_ => Err(ExecutionError::ExecutionError(format!(
"Unsupported {:?} type for repl.",
column.data_type()
))),
}
}