blob: 4b136056b6412419076c744b325ec260971a16c7 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''gateway_looper.py'''
import asyncore
import errno
import os
import time
import select
from .event_looper import EventLooper
from heron.common.src.python.utils.log import Log
class GatewayLooper(EventLooper):
"""A GatewayLooper, inheriting EventLooper
It is a class wrapping Python's asyncore module (and selector) to dispatch events.
This class can be used as a looper for an ``asyncore.dispatcher`` class, instead of calling
``asyncore.loop()``.
As it is a subclass of EventLooper, it will execute in a while loop until
the ``exit_loop()`` is called.
In order to use this class, users first need to specify a socket map that maps from
a file descriptor to ``asyncore.dispatcher`` class, using ``prepare_map()`` method.
The GatewayLooper will dispatch ready events that are in the specified map.
"""
def __init__(self, socket_map):
"""Initializes a GatewayLooper instance
:param socket_map: socket map used for asyncore.dispatcher
"""
super(GatewayLooper, self).__init__()
self.sock_map = socket_map
# Pipe used for wake up select
self.pipe_r, self.pipe_w = os.pipe()
self.started = time.time()
Log.debug("Gateway Looper started time: " + str(time.asctime()))
def do_wait(self):
next_timeout = self._get_next_timeout_interval()
if next_timeout > 0:
self.poll(timeout=next_timeout)
else:
self.poll(timeout=0.0)
def wake_up(self):
os.write(self.pipe_w, "\n")
Log.debug("Wake up called")
def on_exit(self):
super(GatewayLooper, self).on_exit()
os.close(self.pipe_r)
os.close(self.pipe_w)
# pylint: disable=too-many-branches
def poll(self, timeout=0.0):
"""Modified version of poll() from asyncore module"""
if self.sock_map is None:
Log.warning("Socket map is not registered to Gateway Looper")
readable_lst = []
writable_lst = []
error_lst = []
if self.sock_map is not None:
for fd, obj in list(self.sock_map.items()):
is_r = obj.readable()
is_w = obj.writable()
if is_r:
readable_lst.append(fd)
if is_w and not obj.accepting:
writable_lst.append(fd)
if is_r or is_w:
error_lst.append(fd)
# Add wakeup fd
readable_lst.append(self.pipe_r)
Log.debug("Will select() with timeout: " + str(timeout) + ", with map: " + str(self.sock_map))
try:
readable_lst, writable_lst, error_lst = \
select.select(readable_lst, writable_lst, error_lst, timeout)
except select.error as err:
Log.debug("Trivial error: " + str(err))
if err.args[0] != errno.EINTR:
raise
else:
return
Log.debug("Selected [r]: " + str(readable_lst) +
" [w]: " + str(writable_lst) + " [e]: " + str(error_lst))
if self.pipe_r in readable_lst:
Log.debug("Read from pipe")
os.read(self.pipe_r, 1024)
readable_lst.remove(self.pipe_r)
if self.sock_map is not None:
for fd in readable_lst:
obj = self.sock_map.get(fd)
if obj is None:
continue
asyncore.read(obj)
for fd in writable_lst:
obj = self.sock_map.get(fd)
if obj is None:
continue
asyncore.write(obj)
for fd in error_lst:
obj = self.sock_map.get(fd)
if obj is None:
continue
# pylint: disable=W0212
asyncore._exception(obj)