| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require 'logger' |
| require 'thread' |
| |
| module Thrift |
| # this class expects to always use a FramedTransport for reading messages |
| class NonblockingServer < BaseServer |
| def initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20, logger=nil) |
| super(processor, server_transport, transport_factory, protocol_factory) |
| @num_threads = num |
| if logger.nil? |
| @logger = Logger.new(STDERR) |
| @logger.level = Logger::WARN |
| else |
| @logger = logger |
| end |
| @shutdown_semaphore = Mutex.new |
| @transport_semaphore = Mutex.new |
| end |
| |
| def serve |
| @logger.info "Starting #{self}" |
| @server_transport.listen |
| @io_manager = start_io_manager |
| |
| begin |
| loop do |
| break if @server_transport.closed? |
| rd, = select([@server_transport], nil, nil, 0.1) |
| next if rd.nil? |
| socket = @server_transport.accept |
| @logger.debug "Accepted socket: #{socket.inspect}" |
| @io_manager.add_connection socket |
| end |
| rescue IOError => e |
| end |
| # we must be shutting down |
| @logger.info "#{self} is shutting down, goodbye" |
| ensure |
| @transport_semaphore.synchronize do |
| @server_transport.close |
| end |
| @io_manager.ensure_closed unless @io_manager.nil? |
| end |
| |
| def shutdown(timeout = 0, block = true) |
| @shutdown_semaphore.synchronize do |
| return if @is_shutdown |
| @is_shutdown = true |
| end |
| # nonblocking is intended for calling from within a Handler |
| # but we can't change the order of operations here, so lets thread |
| shutdown_proc = lambda do |
| @io_manager.shutdown(timeout) |
| @transport_semaphore.synchronize do |
| @server_transport.close # this will break the accept loop |
| end |
| end |
| if block |
| shutdown_proc.call |
| else |
| Thread.new &shutdown_proc |
| end |
| end |
| |
| private |
| |
| def start_io_manager |
| iom = IOManager.new(@processor, @server_transport, @transport_factory, @protocol_factory, @num_threads, @logger) |
| iom.spawn |
| iom |
| end |
| |
| class IOManager # :nodoc: |
| DEFAULT_BUFFER = 2**20 |
| |
| def initialize(processor, server_transport, transport_factory, protocol_factory, num, logger) |
| @processor = processor |
| @server_transport = server_transport |
| @transport_factory = transport_factory |
| @protocol_factory = protocol_factory |
| @num_threads = num |
| @logger = logger |
| @connections = [] |
| @buffers = Hash.new { |h,k| h[k] = '' } |
| @signal_queue = Queue.new |
| @signal_pipes = IO.pipe |
| @signal_pipes[1].sync = true |
| @worker_queue = Queue.new |
| @shutdown_queue = Queue.new |
| end |
| |
| def add_connection(socket) |
| signal [:connection, socket] |
| end |
| |
| def spawn |
| @iom_thread = Thread.new do |
| @logger.debug "Starting #{self}" |
| run |
| end |
| end |
| |
| def shutdown(timeout = 0) |
| @logger.debug "#{self} is shutting down workers" |
| @worker_queue.clear |
| @num_threads.times { @worker_queue.push [:shutdown] } |
| signal [:shutdown, timeout] |
| @shutdown_queue.pop |
| @signal_pipes[0].close |
| @signal_pipes[1].close |
| @logger.debug "#{self} is shutting down, goodbye" |
| end |
| |
| def ensure_closed |
| kill_worker_threads if @worker_threads |
| @iom_thread.kill |
| end |
| |
| private |
| |
| def run |
| spin_worker_threads |
| |
| loop do |
| rd, = select([@signal_pipes[0], *@connections]) |
| if rd.delete @signal_pipes[0] |
| break if read_signals == :shutdown |
| end |
| rd.each do |fd| |
| if fd.handle.eof? |
| remove_connection fd |
| else |
| read_connection fd |
| end |
| end |
| end |
| join_worker_threads(@shutdown_timeout) |
| ensure |
| @shutdown_queue.push :shutdown |
| end |
| |
| def read_connection(fd) |
| @buffers[fd] << fd.read(DEFAULT_BUFFER) |
| frame = slice_frame!(@buffers[fd]) |
| if frame |
| @logger.debug "#{self} is processing a frame" |
| @worker_queue.push [:frame, fd, frame] |
| end |
| end |
| |
| def spin_worker_threads |
| @logger.debug "#{self} is spinning up worker threads" |
| @worker_threads = [] |
| @num_threads.times do |
| @worker_threads << spin_thread |
| end |
| end |
| |
| def spin_thread |
| Worker.new(@processor, @transport_factory, @protocol_factory, @logger, @worker_queue).spawn |
| end |
| |
| def signal(msg) |
| @signal_queue << msg |
| @signal_pipes[1].write " " |
| end |
| |
| def read_signals |
| # clear the signal pipe |
| # note that since read_nonblock is broken in jruby, |
| # we can only read up to a set number of signals at once |
| sigstr = @signal_pipes[0].readpartial(1024) |
| # now read the signals |
| begin |
| sigstr.length.times do |
| signal, obj = @signal_queue.pop(true) |
| case signal |
| when :connection |
| @connections << obj |
| when :shutdown |
| @shutdown_timeout = obj |
| return :shutdown |
| end |
| end |
| rescue ThreadError |
| # out of signals |
| # note that in a perfect world this would never happen, since we're |
| # only reading the number of signals pushed on the pipe, but given the lack |
| # of locks, in theory we could clear the pipe/queue while a new signal is being |
| # placed on the pipe, at which point our next read_signals would hit this error |
| end |
| end |
| |
| def remove_connection(fd) |
| # don't explicitly close it, a thread may still be writing to it |
| @connections.delete fd |
| @buffers.delete fd |
| end |
| |
| def join_worker_threads(shutdown_timeout) |
| start = Time.now |
| @worker_threads.each do |t| |
| if shutdown_timeout > 0 |
| timeout = (start + shutdown_timeout) - Time.now |
| break if timeout <= 0 |
| t.join(timeout) |
| else |
| t.join |
| end |
| end |
| kill_worker_threads |
| end |
| |
| def kill_worker_threads |
| @worker_threads.each do |t| |
| t.kill if t.status |
| end |
| @worker_threads.clear |
| end |
| |
| def slice_frame!(buf) |
| if buf.length >= 4 |
| size = buf.unpack('N').first |
| if buf.length >= size + 4 |
| buf.slice!(0, size + 4) |
| else |
| nil |
| end |
| else |
| nil |
| end |
| end |
| |
| class Worker # :nodoc: |
| def initialize(processor, transport_factory, protocol_factory, logger, queue) |
| @processor = processor |
| @transport_factory = transport_factory |
| @protocol_factory = protocol_factory |
| @logger = logger |
| @queue = queue |
| end |
| |
| def spawn |
| Thread.new do |
| @logger.debug "#{self} is spawning" |
| run |
| end |
| end |
| |
| private |
| |
| def run |
| loop do |
| cmd, *args = @queue.pop |
| case cmd |
| when :shutdown |
| @logger.debug "#{self} is shutting down, goodbye" |
| break |
| when :frame |
| fd, frame = args |
| begin |
| otrans = @transport_factory.get_transport(fd) |
| oprot = @protocol_factory.get_protocol(otrans) |
| membuf = MemoryBufferTransport.new(frame) |
| itrans = @transport_factory.get_transport(membuf) |
| iprot = @protocol_factory.get_protocol(itrans) |
| @processor.process(iprot, oprot) |
| rescue => e |
| @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}" |
| end |
| end |
| end |
| end |
| end |
| end |
| end |
| end |