blob: 613d883904ae4a8b361e20bd94964128954f074f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
require 'spec_helper'
describe 'NonblockingServer' do
class Handler
def initialize
@queue = Queue.new
end
attr_accessor :server
def greeting(english)
if english
SpecNamespace::Hello.new
else
SpecNamespace::Hello.new(:greeting => "Aloha!")
end
end
def block
@queue.pop
end
def unblock(n)
n.times { @queue.push true }
end
def sleep(time)
Kernel.sleep time
end
def shutdown
@server.shutdown(0, false)
end
end
class SpecTransport < Thrift::BaseTransport
def initialize(transport, queue)
@transport = transport
@queue = queue
@flushed = false
end
def open?
@transport.open?
end
def open
@transport.open
end
def close
@transport.close
end
def read(sz)
@transport.read(sz)
end
def write(buf,sz=nil)
@transport.write(buf, sz)
end
def flush
@queue.push :flushed unless @flushed or @queue.nil?
@flushed = true
@transport.flush
end
end
class SpecServerSocket < Thrift::ServerSocket
def initialize(host, port, queue)
super(host, port)
@queue = queue
end
def listen
super
@queue.push :listen
end
end
describe Thrift::NonblockingServer do
before(:each) do
@port = 43251
handler = Handler.new
processor = SpecNamespace::NonblockingService::Processor.new(handler)
queue = Queue.new
@transport = SpecServerSocket.new('localhost', @port, queue)
transport_factory = Thrift::FramedTransportFactory.new
logger = Logger.new(STDERR)
logger.level = Logger::WARN
@server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger)
handler.server = @server
@server_thread = Thread.new(Thread.current) do |master_thread|
begin
@server.serve
rescue => e
p e
puts e.backtrace * "\n"
master_thread.raise e
end
end
queue.pop
@clients = []
@catch_exceptions = false
end
after(:each) do
@clients.each { |client, trans| trans.close }
# @server.shutdown(1)
@server_thread.kill
@transport.close
end
def setup_client(queue = nil)
transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue)
protocol = Thrift::BinaryProtocol.new(transport)
client = SpecNamespace::NonblockingService::Client.new(protocol)
transport.open
@clients << [client, transport]
client
end
def setup_client_thread(result)
queue = Queue.new
Thread.new do
begin
client = setup_client
while (cmd = queue.pop)
msg, *args = cmd
case msg
when :block
result << client.block
when :unblock
client.unblock(args.first)
when :hello
result << client.greeting(true) # ignore result
when :sleep
client.sleep(args[0] || 0.5)
result << :slept
when :shutdown
client.shutdown
when :exit
result << :done
break
end
end
@clients.each { |c,t| t.close and break if c == client } #close the transport
rescue => e
raise e unless @catch_exceptions
end
end
queue
end
it "should handle basic message passing" do
client = setup_client
expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
@server.shutdown
end
it "should handle concurrent clients" do
queue = Queue.new
trans_queue = Queue.new
4.times do
Thread.new(Thread.current) do |main_thread|
begin
queue.push setup_client(trans_queue).block
rescue => e
main_thread.raise e
end
end
end
4.times { trans_queue.pop }
setup_client.unblock(4)
4.times { expect(queue.pop).to be_truthy }
@server.shutdown
end
it "should handle messages from more than 5 long-lived connections" do
queues = []
result = Queue.new
7.times do |i|
queues << setup_client_thread(result)
Thread.pass if i == 4 # give the server time to accept connections
end
client = setup_client
# block 4 connections
4.times { |i| queues[i] << :block }
queues[4] << :hello
queues[5] << :hello
queues[6] << :hello
3.times { expect(result.pop).to eq(SpecNamespace::Hello.new) }
expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
queues[5] << [:unblock, 4]
4.times { expect(result.pop).to be_truthy }
queues[2] << :hello
expect(result.pop).to eq(SpecNamespace::Hello.new)
expect(client.greeting(false)).to eq(SpecNamespace::Hello.new(:greeting => 'Aloha!'))
7.times { queues.shift << :exit }
expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
@server.shutdown
end
it "should shut down when asked" do
# connect first to ensure it's running
client = setup_client
client.greeting(false) # force a message pass
@server.shutdown
expect(@server_thread.join(2)).to be_an_instance_of(Thread)
end
it "should continue processing active messages when shutting down" do
result = Queue.new
client = setup_client_thread(result)
client << :sleep
sleep 0.1 # give the server time to start processing the client's message
@server.shutdown
expect(@server_thread.join(2)).to be_an_instance_of(Thread)
expect(result.pop).to eq(:slept)
end
it "should kill active messages when they don't expire while shutting down" do
result = Queue.new
client = setup_client_thread(result)
client << [:sleep, 10]
sleep 0.1 # start processing the client's message
@server.shutdown(1)
@catch_exceptions = true
expect(@server_thread.join(3)).not_to be_nil
expect(result).to be_empty
end
it "should allow shutting down in response to a message" do
client = setup_client
expect(client.greeting(true)).to eq(SpecNamespace::Hello.new)
client.shutdown
expect(@server_thread.join(2)).not_to be_nil
end
end
end