blob: aeb9f90587ab705505ee64778251c99a390258f9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
class TestExecutePlan < Test::Unit::TestCase
include Helper::Buildable
include Helper::Omittable
def execute(plan)
plan.validate
plan.start
plan.wait
yield
plan.stop
end
sub_test_case("aggregate") do
def build_plan
plan = Arrow::ExecutePlan.new
record_batch =
build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
string: build_string_array(["a", "b", "a", "b", "a"]))
source_node_options = Arrow::SourceNodeOptions.new(record_batch)
source_node = plan.build_source_node(source_node_options)
aggregate_node_options = yield
aggregate_node = plan.build_aggregate_node(source_node,
aggregate_node_options)
sink_node_options = Arrow::SinkNodeOptions.new
sink_node = plan.build_sink_node(aggregate_node,
sink_node_options)
[plan, sink_node_options.get_reader(aggregate_node.output_schema)]
end
def test_by_string
plan, reader = build_plan do
aggregations = [
Arrow::Aggregation.new("hash_sum", nil, "number", "sum(number)"),
Arrow::Aggregation.new("hash_count", nil, "number", "count(number)"),
]
Arrow::AggregateNodeOptions.new(aggregations, ["string"])
end
execute(plan) do
assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
"count(number)" => build_int64_array([3, 2]),
"string" => build_string_array(["a", "b"])),
reader.read_all)
end
end
end
sub_test_case("hash join") do
def build_plan
plan = Arrow::ExecutePlan.new
left_record_batch =
build_record_batch(number: build_int8_array([1, 2, 3, 4, 5]),
string: build_string_array(["a", "b", "a", "b", "a"]))
left_node_options = Arrow::SourceNodeOptions.new(left_record_batch)
left_node = plan.build_source_node(left_node_options)
right_record_batch =
build_record_batch(right_number: build_int8_array([1, 2]),
right_string: build_string_array(["R-1", "R-2"]))
right_node_options = Arrow::SourceNodeOptions.new(right_record_batch)
right_node = plan.build_source_node(right_node_options)
hash_join_node_options = yield
hash_join_node = plan.build_hash_join_node(left_node,
right_node,
hash_join_node_options)
sink_node_options = Arrow::SinkNodeOptions.new
sink_node = plan.build_sink_node(hash_join_node,
sink_node_options)
[plan, sink_node_options.get_reader(hash_join_node.output_schema)]
end
def test_output_all
plan, reader = build_plan do
Arrow::HashJoinNodeOptions.new(:left_outer,
["number"],
["right_number"])
end
execute(plan) do
left_number = build_int8_array([1, 2, 3, 4, 5])
left_string = build_string_array(["a", "b", "a", "b", "a"])
right_number = build_int8_array([1, 2, nil, nil, nil])
right_string = build_string_array(["R-1", "R-2", nil, nil, nil])
assert_equal(build_table("number" => left_number,
"string" => left_string,
"right_number" => right_number,
"right_string" => right_string),
reader.read_all)
end
end
def test_output_selected
plan, reader = build_plan do
options = Arrow::HashJoinNodeOptions.new(:left_outer,
["number"],
["right_number"])
options.left_outputs = ["number"]
options.right_outputs = ["right_number"]
options
end
execute(plan) do
left_number = build_int8_array([1, 2, 3, 4, 5])
right_number = build_int8_array([1, 2, nil, nil, nil])
assert_equal(build_table("number" => left_number,
"right_number" => right_number),
reader.read_all)
end
end
end
end