| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| class TestStreamDecoder < Test::Unit::TestCase |
| include Helper::Buildable |
| |
| class Listener < Arrow::StreamListener |
| type_register |
| |
| attr_reader :events |
| def initialize |
| super |
| @events = [] |
| end |
| |
| private |
| def virtual_do_on_eos |
| @events << [:eos] |
| true |
| end |
| |
| def virtual_do_on_record_batch_decoded(record_batch, metadata) |
| @events << [:record_batch_decoded, record_batch, metadata] |
| true |
| end |
| |
| def virtual_do_on_schema_decoded(schema, filtered_schema) |
| @events << [:schema_decoded, schema, filtered_schema] |
| true |
| end |
| end |
| |
| def setup |
| columns = { |
| "enabled": build_boolean_array([true, false, nil, true]), |
| } |
| @record_batch = build_record_batch(columns) |
| @schema = @record_batch.schema |
| |
| @buffer = Arrow::ResizableBuffer.new(0) |
| output = Arrow::BufferOutputStream.new(@buffer) |
| stream_writer = Arrow::RecordBatchStreamWriter.new(output, @schema) |
| stream_writer.write_record_batch(@record_batch) |
| stream_writer.close |
| output.close |
| |
| @listener = Listener.new |
| @decoder = Arrow::StreamDecoder.new(@listener) |
| end |
| |
| def test_listener |
| assert_equal(@listener, @decoder.listener) |
| end |
| |
| def test_consume_bytes |
| @buffer.data.to_s.each_byte do |byte| |
| @decoder.consume_bytes(GLib::Bytes.new(byte.chr)) |
| end |
| assert_equal([ |
| [:schema_decoded, @schema, @schema], |
| [:record_batch_decoded, @record_batch, nil], |
| [:eos], |
| ], |
| @listener.events) |
| end |
| |
| def test_consume_buffer |
| # We need to keep data that aren't processed yet. |
| data = [] |
| @buffer.data.to_s.each_byte do |byte| |
| data << byte.chr |
| can_clear = (@decoder.next_required_size == 1) |
| @decoder.consume_buffer(Arrow::Buffer.new(data.last)) |
| # We can release a reference for kept data after they are |
| # processed. |
| data.clear if can_clear |
| end |
| assert_equal([ |
| [:schema_decoded, @schema, @schema], |
| [:record_batch_decoded, @record_batch, nil], |
| [:eos], |
| ], |
| @listener.events) |
| end |
| |
| def test_reset |
| @decoder.consume_bytes(@buffer.data.to_s[0, 10]) |
| @decoder.reset |
| @decoder.consume_bytes(@buffer.data) |
| assert_equal([ |
| [:schema_decoded, @schema, @schema], |
| [:record_batch_decoded, @record_batch, nil], |
| [:eos], |
| ], |
| @listener.events) |
| end |
| |
| def test_schema |
| assert_nil(@decoder.schema) |
| @decoder.consume_bytes(@buffer.data) |
| assert_equal(@schema, @decoder.schema) |
| end |
| |
| def test_next_required_size |
| data = @buffer.data.to_s |
| loop do |
| next_required_size = @decoder.next_required_size |
| break if next_required_size.zero? |
| @decoder.consume_bytes(data[0, next_required_size]) |
| data = data[next_required_size..-1] |
| end |
| assert_equal([ |
| [:schema_decoded, @schema, @schema], |
| [:record_batch_decoded, @record_batch, nil], |
| [:eos], |
| ], |
| @listener.events) |
| end |
| end |