blob: 58dd12da2316583befd9f4cde9d7ab534641723c [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
from concurrent.futures import Future
import warnings
from gremlin_python.driver import client, serializer
from gremlin_python.driver.remote_connection import RemoteConnection, RemoteTraversal
from gremlin_python.driver.request import Tokens
log = logging.getLogger("gremlinpython")
__author__ = 'David M. Brown (davebshow@gmail.com), Lyndon Bauto (lyndonb@bitquilltech.com)'
class DriverRemoteConnection(RemoteConnection):
def __init__(self, url, traversal_source="g", protocol_factory=None,
transport_factory=None, pool_size=None, max_workers=None,
request_serializer=serializer.GraphBinarySerializersV4(),
response_serializer=None, interceptors=None, auth=None,
headers=None, enable_user_agent_on_connect=True,
bulk_results=False, **transport_kwargs):
log.info("Creating DriverRemoteConnection with url '%s'", str(url))
self.__url = url
self.__traversal_source = traversal_source
self.__protocol_factory = protocol_factory
self.__transport_factory = transport_factory
self.__pool_size = pool_size
self.__max_workers = max_workers
self.__auth = auth
self.__headers = headers
self.__enable_user_agent_on_connect = enable_user_agent_on_connect
self.__bulk_results = bulk_results
self.__transport_kwargs = transport_kwargs
if response_serializer is None:
response_serializer = serializer.GraphBinarySerializersV4()
self._client = client.Client(url, traversal_source,
protocol_factory=protocol_factory,
transport_factory=transport_factory,
pool_size=pool_size,
max_workers=max_workers,
request_serializer=request_serializer,
response_serializer=response_serializer,
interceptors=interceptors, auth=auth,
headers=headers,
enable_user_agent_on_connect=enable_user_agent_on_connect,
bulk_results=bulk_results,
**transport_kwargs)
self._url = self._client._url
self._traversal_source = self._client._traversal_source
def close(self):
log.info("closing DriverRemoteConnection with url '%s'", str(self._url))
self._client.close()
def submit(self, gremlin_lang):
log.debug("submit with gremlin lang script '%s'", gremlin_lang.get_gremlin())
gremlin_lang.add_g(self._traversal_source)
result_set = self._client.submit(gremlin_lang.get_gremlin(),
request_options=self.extract_request_options(gremlin_lang))
results = result_set.all().result()
return RemoteTraversal(iter(results))
def submitAsync(self, message, bindings=None, request_options=None):
warnings.warn(
"gremlin_python.driver.driver_remote_connection.DriverRemoteConnection.submitAsync will be replaced by "
"gremlin_python.driver.driver_remote_connection.DriverRemoteConnection.submit_async.",
DeprecationWarning)
self.submit_async(message, bindings, request_options)
def submit_async(self, gremlin_lang):
log.debug("submit_async with gremlin lang script '%s'", gremlin_lang.get_gremlin())
future = Future()
gremlin_lang.add_g(self._traversal_source)
future_result_set = self._client.submit_async(gremlin_lang.get_gremlin(),
request_options=self.extract_request_options(gremlin_lang))
def cb(f):
try:
result_set = f.result()
results = result_set.all().result()
future.set_result(RemoteTraversal(iter(results)))
except Exception as e:
future.set_exception(e)
future_result_set.add_done_callback(cb)
return future
def is_closed(self):
return self._client.is_closed()
# TODO remove or update once HTTP transaction is implemented
# def commit(self):
# log.info("Submitting commit graph operation.")
# return self._client.submit(Bytecode.GraphOp.commit())
#
# def rollback(self):
# log.info("Submitting rollback graph operation.")
# return self._client.submit(Bytecode.GraphOp.rollback())
@staticmethod
def extract_request_options(gremlin_lang):
request_options = {}
for os in gremlin_lang.options_strategies:
request_options.update({token: os.configuration[token] for token in Tokens
if token in os.configuration})
# request the server to bulk results by default when using drc through request options
if 'bulkResults' not in request_options:
request_options['bulkResults'] = True
if gremlin_lang.parameters is not None and len(gremlin_lang.parameters) > 0:
request_options["params"] = gremlin_lang.parameters
return request_options