blob: 82946a725d6f698fc42feb0eedec41aba98af175 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from concurrent.futures import Future
__author__ = 'David M. Brown (davebshow@gmail.com)'
class ResultSet:
def __init__(self, stream, request_id):
self._stream = stream
self._request_id = request_id
self._done = None
self._aggregate_to = None
@property
def aggregate_to(self):
return self._aggregate_to
@aggregate_to.setter
def aggregate_to(self, val):
self._aggregate_to = val
@property
def request_id(self):
return self._request_id
@property
def stream(self):
return self._stream
def __iter__(self):
return self
def __next__(self):
result = self.one()
if not result:
raise StopIteration
return result
def next(self):
return self.__next__()
@property
def done(self):
return self._done
@done.setter
def done(self, future):
self._done = future
def one(self):
while not self.done.done():
if not self.stream.empty():
return self.stream.get_nowait()
if not self.stream.empty():
return self.stream.get_nowait()
return self.done.result()
def all(self):
future = Future()
def cb(f):
try:
f.result()
except Exception as e:
future.set_exception(e)
else:
results = []
while not self.stream.empty():
results += self.stream.get_nowait()
future.set_result(results)
self.done.add_done_callback(cb)
return future