blob: 7145f8c3892944c770eed5b1a809ca680d3bf1a5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import asyncio
import os
import threading
import uuid
import pytest
from gremlin_python.driver.client import Client
from gremlin_python.driver.protocol import GremlinServerError
from gremlin_python.driver.request import RequestMessage
from gremlin_python.driver.serializer import GraphBinarySerializersV4
from gremlin_python.process.graph_traversal import __, GraphTraversalSource
from gremlin_python.process.traversal import TraversalStrategies, GValue
from gremlin_python.process.strategies import OptionsStrategy
from gremlin_python.structure.graph import Graph, Vertex
from gremlin_python.driver.aiohttp.transport import AiohttpHTTPTransport
from gremlin_python.statics import *
from asyncio import TimeoutError
__author__ = 'David M. Brown (davebshow@gmail.com)'
gremlin_server_url = os.environ.get('GREMLIN_SERVER_URL', 'http://localhost:{}/gremlin')
test_no_auth_url = gremlin_server_url.format(45940)
def create_basic_request_message(traversal, source='gmodern'):
return RequestMessage(fields={'g': source}, gremlin=traversal.gremlin_lang.get_gremlin())
def test_connection(connection):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
results_set = connection.write(message).result()
future = results_set.all()
results = future.result()
assert len(results) == 6
assert isinstance(results, list)
assert results_set.done.done()
# TODO: revisit after max_content_length definition/implementation is updated
def test_client_message_too_big(client):
try:
client = Client(test_no_auth_url, 'g', max_content_length=1024)
client.submit("g.inject(' ').repeat(concat(' ')).times(1234)").all().result()
assert False
except Exception as ex:
assert ex.args[0].startswith('Response size') \
and ex.args[0].endswith('exceeds limit 1024 bytes')
# confirm the client instance is still usable and not closed
assert ["test"] == client.submit("g.inject('test')").all().result()
finally:
client.close()
def test_client_large_result(client):
result_set = client.submit("[\" \".repeat(200000), \" \".repeat(100000)]", request_options={'language': 'gremlin-groovy'}).all().result()
assert len(result_set[0]) == 200000
assert len(result_set[1]) == 100000
def test_client_script_submission(client):
assert len(client.submit("g.inject(0,0,0,0,0)").all().result()) == 5
def test_client_simple_eval(client):
assert client.submit('g.inject(2)').all().result()[0] == 2
def test_client_simple_eval_bindings(client):
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] == 29
def test_client_eval_traversal(client):
assert len(client.submit('g.V()').all().result()) == 6
def test_client_eval_traversal_bindings(client):
assert client.submit('g.V(x).values("name")', bindings={'x': 1}).all().result()[0] == 'marko'
def test_client_eval_traversal_request_options_bindings(client):
assert client.submit('g.V(x).values("name")', request_options={'bindings': {'x': 1}}).all().result()[0] == 'marko'
def test_client_eval_traversal_bindings_request_options_bindings(client):
# Note that parameters from request_options[bindings] is applied later and will replace bindings if key is the same
assert client.submit('g.V(x).values("name")', bindings={'x': 1},
request_options={'bindings': {'x': 2}}).all().result()[0] == 'vadas'
def test_client_error(client):
try:
# should fire an exception
client.submit('g.inject(1).math("_/0")').all().result()
assert False
except GremlinServerError as ex:
assert ex.status_message.endswith('Division by zero!')
assert ex.status_exception
assert str(ex) == f"{ex.status_code}: {ex.status_message}"
# still can submit after failure
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] == 29
def test_bad_serialization(client):
try:
# should timeout
client.submit('java.awt.Color.RED', request_options={'language': 'gremlin-groovy'}).all().result()
assert False
except GremlinServerError as ex:
assert ex.status_message
assert ex.status_exception
assert str(ex) == f"{ex.status_code}: {ex.status_message}"
# still can submit after failure
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] == 29
def test_client_connection_pool_after_error(client):
# Overwrite fixture with pool_size=1 client
client = Client(test_no_auth_url, 'gmodern', pool_size=1)
try:
# should fire an exception
client.submit('g.inject(1).math("_/0")').all().result()
assert False
except GremlinServerError as gse:
# expecting the pool size to be 1 again after query returned
assert gse.status_code == 500
assert client.available_pool_size == 1
# still can submit after failure
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] == 29
def test_client_no_hang_if_submit_on_closed(client):
assert client.submit('g.inject(2)').all().result()[0] == 2
client.close()
try:
# should fail since not hang if closed
client.submit('g.inject(2)').all().result()
assert False
except Exception as ex:
assert True
def test_client_close_all_connection_in_pool(client):
client = Client(test_no_auth_url, 'g', pool_size=1)
assert client.available_pool_size == 1
client.submit('g.inject(4)').all().result()
client.close()
assert client.available_pool_size == 0
def test_client_side_timeout_set_for_aiohttp(client):
client = Client(test_no_auth_url, 'gmodern',
transport_factory=lambda: AiohttpHTTPTransport(read_timeout=1, write_timeout=1))
try:
# should fire an exception
client.submit('Thread.sleep(2000);1', request_options={'language': 'gremlin-groovy'}).all().result()
assert False
except TimeoutError as err:
# asyncio TimeoutError has no message.
assert str(err) == ""
# still can submit after failure
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0] == 29
async def async_connect(enable):
try:
transport = AiohttpHTTPTransport(call_from_event_loop=enable)
transport.connect(test_no_auth_url)
transport.close()
return True
except RuntimeError:
return False
def test_from_event_loop():
assert not asyncio.get_event_loop().run_until_complete(async_connect(False))
assert asyncio.get_event_loop().run_until_complete(async_connect(True))
def test_client_submit(client):
result_set = client.submit('g.V(1)')
result = result_set.all().result()
assert 1 == len(result)
vertex = result[0]
assert type(vertex) is Vertex
assert 1 == vertex.id
assert 2 == len(vertex.properties)
assert 'name' == vertex.properties[0].key
assert 'marko' == vertex.properties[0].value
##
result_set = client.submit('g.with("materializeProperties", "tokens").V(1)')
result = result_set.all().result()
assert 1 == len(result)
vertex = result[0]
assert 1 == vertex.id
assert 0 == len(vertex.properties)
##
result_set = client.submit('g.with("materializeProperties", "tokens").E(7)')
result = result_set.all().result()
assert 1 == len(result)
edge = result[0]
assert 7 == edge.id
assert 0 == len(edge.properties)
##
result_set = client.submit('g.with("materializeProperties", "tokens").V(1).properties()')
result = result_set.all().result()
assert 2 == len(result)
for vp in result:
assert 0 == len(vp.properties)
def test_client_gremlin_lang(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
result_set = client.submit(message)
assert len(result_set.all().result()) == 6
def test_client_gremlin_lang_options(client):
# smoke test to validate serialization of OptionsStrategy. no way to really validate this from an integration
# test perspective because there's no way to access the internals of the strategy via bytecode
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.with_strategies(OptionsStrategy(**{"x": "test", "y": True})).V()
message = create_basic_request_message(t)
result_set = client.submit(message)
assert len(result_set.all().result()) == 6
##
t = g.with_("x", "test").with_("y", True).V()
message = create_basic_request_message(t)
result_set = client.submit(message)
assert len(result_set.all().result()) == 6
def test_client_gremlin_lang_request_options_with_binding(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
# Note that bindings for constructed traversals is done via Parameter only
t = g.with_('language', 'gremlin-lang').V(GValue('x', [1, 2, 3])).count()
request_opts = {'language': 'gremlin-lang', 'params': {'x': [1, 2, 3]}}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
assert result_set.all().result()[0] == 3
# We can re-use the extracted request options in script submission
result_set = client.submit('g.V(x).values("name")', request_options=request_opts)
assert result_set.all().result()[0] == 'marko'
# For script submission only, we can also add bindings to request options and they will be applied
request_opts['bindings'] = {'y': 4}
result_set = client.submit('g.V(y).values("name")', request_options=request_opts)
assert result_set.all().result()[0] == 'josh'
result_set = client.submit('g.V(z).values("name")', bindings={'z': 5}, request_options=request_opts)
assert result_set.all().result()[0] == 'ripple'
def test_iterate_result_set(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 6
def test_client_async(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
future = client.submit_async(message)
result_set = future.result()
assert len(result_set.all().result()) == 6
def test_connection_share(client):
# Overwrite fixture with pool_size=1 client
client = Client(test_no_auth_url, 'gmodern', pool_size=1)
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
message2 = create_basic_request_message(t)
future = client.submit_async(message)
future2 = client.submit_async(message2)
result_set2 = future2.result()
assert len(result_set2.all().result()) == 6
# This future has to finish for the second to yield result - pool_size=1
assert future.done()
result_set = future.result()
assert len(result_set.all().result()) == 6
def test_multi_conn_pool(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
message2 = create_basic_request_message(t)
client = Client(test_no_auth_url, 'g', pool_size=1)
future = client.submit_async(message)
future2 = client.submit_async(message2)
result_set2 = future2.result()
assert len(result_set2.all().result()) == 6
# with connection pool `future` may or may not be done here
result_set = future.result()
assert len(result_set.all().result()) == 6
def test_multi_thread_pool(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
traversals = [g.V(),
g.V().count(),
g.E(),
g.E().count()
]
results = [[] for _ in traversals]
# Use a condition variable to synchronise a group of threads, which should also inject some
# non-determinism into the run-time execution order
condition = threading.Condition()
def thread_run(tr, result_list):
message = create_basic_request_message(tr)
with condition:
condition.wait(5)
result_set = client.submit(message)
for result in result_set:
result_list.append(result)
threads = []
for i in range(len(results)):
thread = threading.Thread(target=thread_run,
args=(traversals[i], results[i]),
name="test_multi_thread_pool_%d" % i)
thread.daemon = True
threads.append(thread)
thread.start()
with condition:
condition.notify_all()
for t in threads:
t.join(5)
assert len(results[0][0]) == 6
assert results[1][0][0] == 6
assert len(results[2][0]) == 6
assert results[3][0][0] == 6
def test_client_gremlin_lang_with_short(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.with_('language', 'gremlin-lang').V().has('age', short(16)).count()
request_opts = {'language': 'gremlin-lang'}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
for result in result_set:
results += result
assert len(results) == 1
def test_client_gremlin_lang_with_long(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V().has('age', long(851401972585122)).count()
request_opts = {}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
for result in result_set:
results += result
assert len(results) == 1
def test_client_gremlin_lang_with_bigint(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.with_('language', 'gremlin-lang').V().has('age', bigint(0x1000_0000_0000_0000_0000)).count()
request_opts = {'language': 'gremlin-lang'}
message = create_basic_request_message(t)
result_set = client.submit(message, request_options=request_opts)
results = []
for result in result_set:
results += result
assert len(results) == 1
def test_big_result_set(client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.inject(1).repeat(__.add_v('person').property('name', __.loops())).times(20000).count()
message = create_basic_request_message(t, source='g')
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 1
t = g.V().limit(10)
message = create_basic_request_message(t, source='g')
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 10
t = g.V().limit(100)
message = create_basic_request_message(t, source='g')
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 100
t = g.V().limit(1000)
message = create_basic_request_message(t, source='g')
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 1000
t = g.V().limit(10000)
message = create_basic_request_message(t, source='g')
result_set = client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 10000
def test_big_result_set_secure(authenticated_client):
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.inject(1).repeat(__.add_v('person').property('name', __.loops())).times(20000).count()
message = create_basic_request_message(t, source='g')
result_set = authenticated_client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 1
t = g.V().limit(10)
message = create_basic_request_message(t, source='g')
result_set = authenticated_client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 10
t = g.V().limit(100)
message = create_basic_request_message(t, source='g')
result_set = authenticated_client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 100
t = g.V().limit(1000)
message = create_basic_request_message(t, source='g')
result_set = authenticated_client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 1000
t = g.V().limit(10000)
message = create_basic_request_message(t, source='g')
result_set = authenticated_client.submit(message)
results = []
for result in result_set:
results += result
assert len(results) == 10000
async def asyncio_func():
return 1
def test_asyncio(client):
try:
asyncio.get_event_loop().run_until_complete(asyncio_func())
except RuntimeError:
assert False
# TODO: tests pass because requestID is now generated on HTTP server and this option gets ignored, tests to be removed
# or updated depending on if we still want to use requestID or not
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_invalid_request_id_graphson_script(client):
client = Client(test_no_auth_url, 'gmodern')
try:
client.submit('g.V()', request_options={"requestId": "malformed"}).all().result()
except Exception as ex:
assert "badly formed hexadecimal UUID string" in str(ex)
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_invalid_request_id_graphbinary_script(client):
client = Client(test_no_auth_url, 'gmodern')
try:
client.submit('g.V()', request_options={"requestId": "malformed"}).all().result()
except Exception as ex:
assert "badly formed hexadecimal UUID string" in str(ex)
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_valid_request_id_script_uuid(client):
assert len(client.submit('g.V()', request_options={"requestId": uuid.uuid4()}).all().result()) == 6
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_valid_request_id_script_string(client):
assert len(client.submit('g.V()', request_options={"requestId": str(uuid.uuid4())}).all().result()) == 6
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_invalid_request_id_graphson_bytecode(client):
client = Client(test_no_auth_url, 'gmodern')
query = GraphTraversalSource(Graph(), TraversalStrategies()).V().bytecode
try:
client.submit(query, request_options={"requestId": "malformed"}).all().result()
except Exception as ex:
assert "badly formed hexadecimal UUID string" in str(ex)
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_invalid_request_id_graphbinary_bytecode(client):
client = Client(test_no_auth_url, 'gmodern')
query = GraphTraversalSource(Graph(), TraversalStrategies()).V().bytecode
try:
client.submit(query, request_options={"requestId": "malformed"}).all().result()
except Exception as ex:
assert "badly formed hexadecimal UUID string" in str(ex)
@pytest.mark.skip(reason="requestID is generated on server side only, disable for now")
def test_client_custom_valid_request_id_bytecode(client):
query = GraphTraversalSource(Graph(), TraversalStrategies()).V().bytecode
assert len(client.submit(query).all().result()) == 6
def test_response_serializer_never_None():
client = Client('url', 'g', response_serializer=None)
resp_ser = client.response_serializer()
assert resp_ser is not None
def test_serializer_and_interceptor_forwarded(client_with_interceptor):
result = client_with_interceptor.submit("g.inject(1)").next()
assert [2] == result # interceptor changes request to g.inject(2)