blob: 966ac8bc7a1c41f0d39ba38675c53d02f7fcbe47 [file] [log] [blame]
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import tornado.ioloop
from proton.reactor import Reactor
from proton.handlers import IOHandler
class TornadoApp:
def __init__(self, *args):
self.reactor = Reactor(*args)
self.reactor.global_handler = self
self.io = IOHandler()
self.loop = tornado.ioloop.IOLoop.instance()
self.count = 0
self.reactor.start()
self.reactor.process()
def on_reactor_quiesced(self, event):
event.reactor.yield_()
def on_unhandled(self, name, event):
event.dispatch(self.io)
def _events(self, sel):
events = self.loop.ERROR
if sel.reading:
events |= self.loop.READ
if sel.writing:
events |= self.loop.WRITE
return events
def _schedule(self, sel):
if sel.deadline:
self.loop.add_timeout(sel.deadline, lambda: self.expired(sel))
def _expired(self, sel):
sel.expired()
def _process(self):
self.reactor.process()
if not self.reactor.quiesced:
self.loop.add_callback(self._process)
def _callback(self, sel, events):
if self.loop.READ & events:
sel.readable()
if self.loop.WRITE & events:
sel.writable()
self._process()
def on_selectable_init(self, event):
sel = event.context
if sel.fileno() >= 0:
self.loop.add_handler(sel.fileno(), lambda fd, events: self._callback(sel, events), self._events(sel))
self._schedule(sel)
self.count += 1
def on_selectable_updated(self, event):
sel = event.context
if sel.fileno() > 0:
self.loop.update_handler(sel.fileno(), self._events(sel))
self._schedule(sel)
def on_selectable_final(self, event):
sel = event.context
if sel.fileno() > 0:
self.loop.remove_handler(sel.fileno())
sel.release()
self.count -= 1
if self.count == 0:
self.loop.add_callback(self._stop)
def _stop(self):
self.reactor.stop()
self.loop.stop()