blob: 705d9d7e596cf509e3122e46dc5a45231e4fbc7f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import uuid
from concurrent.futures import Future
from six.moves import queue
from gremlin_python.driver import resultset
__author__ = 'David M. Brown (davebshow@gmail.com)'
class Connection:
def __init__(self, url, traversal_source, protocol, transport_factory,
executor, pool, headers=None):
self._url = url
self._headers = headers
self._traversal_source = traversal_source
self._protocol = protocol
self._transport_factory = transport_factory
self._executor = executor
self._transport = None
self._pool = pool
self._results = {}
self._inited = False
def connect(self):
if self._transport:
self._transport.close()
self._transport = self._transport_factory()
self._transport.connect(self._url, self._headers)
self._protocol.connection_made(self._transport)
self._inited = True
def close(self):
if self._inited:
self._transport.close()
def write(self, request_message):
if not self._inited:
self.connect()
request_id = str(uuid.uuid4())
result_set = resultset.ResultSet(queue.Queue(), request_id)
self._results[request_id] = result_set
# Create write task
future = Future()
future_write = self._executor.submit(
self._protocol.write, request_id, request_message)
def cb(f):
try:
f.result()
except Exception as e:
future.set_exception(e)
self._pool.put_nowait(self)
else:
# Start receive task
done = self._executor.submit(self._receive)
result_set.done = done
future.set_result(result_set)
future_write.add_done_callback(cb)
return future
def _receive(self):
try:
while True:
data = self._transport.read()
status_code = self._protocol.data_received(data, self._results)
if status_code != 206:
break
finally:
self._pool.put_nowait(self)