blob: 008dbfb7e3df2c1af9d8b97836cfe54c1e247638 [file] [log] [blame]
require 'thread'
require 'timeout'
require 'listen/event/processor'
module Listen
module Event
class Loop
class Error < RuntimeError
class NotStarted < Error
end
end
def initialize(config)
@config = config
@wait_thread = nil
@state = :paused
@reasons = ::Queue.new
end
def wakeup_on_event
return if stopped?
return unless processing?
return unless wait_thread.alive?
_wakeup(:event)
end
def paused?
wait_thread && state == :paused
end
def processing?
return false if stopped?
return false if paused?
state == :processing
end
def setup
# TODO: use a Fiber instead?
q = ::Queue.new
@wait_thread = Internals::ThreadPool.add do
_wait_for_changes(q, config)
end
Listen::Logger.debug('Waiting for processing to start...')
Timeout.timeout(5) { q.pop }
end
def resume
fail Error::NotStarted if stopped?
return unless wait_thread
_wakeup(:resume)
end
def pause
# TODO: works?
# fail NotImplementedError
end
def teardown
return unless wait_thread
if wait_thread.alive?
_wakeup(:teardown)
wait_thread.join
end
@wait_thread = nil
end
def stopped?
!wait_thread
end
private
attr_reader :config
attr_reader :wait_thread
attr_accessor :state
def _wait_for_changes(ready_queue, config)
processor = Event::Processor.new(config, @reasons)
_wait_until_resumed(ready_queue)
processor.loop_for(config.min_delay_between_events)
rescue StandardError => ex
_nice_error(ex)
end
def _sleep(*args)
Kernel.sleep(*args)
end
def _wait_until_resumed(ready_queue)
self.state = :paused
ready_queue << :ready
sleep
self.state = :processing
end
def _nice_error(ex)
indent = "\n -- "
msg = format(
'exception while processing events: %s Backtrace:%s%s',
ex,
indent,
ex.backtrace * indent
)
Listen::Logger.error(msg)
end
def _wakeup(reason)
@reasons << reason
wait_thread.wakeup
end
end
end
end