blob: f8eb5adbeecb3347527d932f785083d5d6ee80f6 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''communicator.py: module responsible for communication between Python heron modules'''
import sys
import Queue
from heron.common.src.python.utils.log import Log
class HeronCommunicator(object):
"""HeronCommunicator: a wrapper class for non-blocking queue in Heron.
Note that this class does not yet implement the dynamic tuning of expected available capacity,
as it is not necessary for single thread instance.
"""
def __init__(self, producer_cb=None, consumer_cb=None):
"""Initialize HeronCommunicator
:param producer_cb: Callback function to be called (usually on producer thread)
when ``poll()`` is called by the consumer. Default ``None``
:param consumer_cb: Callback function to be called (usually on consumer thread)
when ``offer()`` is called by the producer. Default ``None``
"""
self._producer_callback = producer_cb
self._consumer_callback = consumer_cb
self._buffer = Queue.Queue()
self.capacity = sys.maxsize
def register_capacity(self, capacity):
"""Registers the capacity of this communicator
By default, the capacity of HeronCommunicator is set to be ``sys.maxsize``
"""
self.capacity = capacity
def get_available_capacity(self):
return max(self.capacity - self.get_size(), 0)
def get_size(self):
"""Returns the size of the buffer"""
return self._buffer.qsize()
def is_empty(self):
"""Returns whether the buffer is empty"""
return self._buffer.empty()
def poll(self):
"""Poll from the buffer
It is a non-blocking operation, and when the buffer is empty, it raises Queue.Empty exception
"""
try:
# non-blocking
ret = self._buffer.get(block=False)
if self._producer_callback is not None:
self._producer_callback()
return ret
except Queue.Empty:
Log.debug("%s: Empty in poll()" % str(self))
raise Queue.Empty
def offer(self, item):
"""Offer to the buffer
It is a non-blocking operation, and when the buffer is full, it raises Queue.Full exception
"""
try:
# non-blocking
self._buffer.put(item, block=False)
if self._consumer_callback is not None:
self._consumer_callback()
return True
except Queue.Full:
Log.debug("%s: Full in offer()" % str(self))
raise Queue.Full
def clear(self):
"""Clear the buffer"""
while not self.is_empty():
self.poll()
def __str__(self):
return "HeronCommunicator"