blob: 7dffcb8303403cca60309ca829e30c22baf7ff0e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
__author__ = 'Marko A. Rodriguez (http://markorodriguez.com), Lyndon Bauto (lyndonb@bitquilltech.com)'
import os
import pytest
from pytest import fail
from gremlin_python.driver import serializer
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.structure.graph import Graph
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.traversal import P
from gremlin_python.process.traversal import Binding, Bindings
from gremlin_python.process.graph_traversal import __
gremlin_server_url = 'ws://localhost:{}/gremlin'
anonymous_url = gremlin_server_url.format(45940)
def transactions_disabled():
return (os.environ['TEST_TRANSACTIONS'] != 'true') if 'TEST_TRANSACTIONS' in os.environ else False
class TestTraversal(object):
def test_bytecode(self):
g = traversal().withGraph(Graph())
bytecode = g.V().out("created").bytecode
assert 0 == len(bytecode.bindings.keys())
assert 0 == len(bytecode.source_instructions)
assert 2 == len(bytecode.step_instructions)
assert "V" == bytecode.step_instructions[0][0]
assert "out" == bytecode.step_instructions[1][0]
assert "created" == bytecode.step_instructions[1][1]
assert 1 == len(bytecode.step_instructions[0])
assert 2 == len(bytecode.step_instructions[1])
##
bytecode = g.withSack(1).E().groupCount().by("weight").bytecode
assert 0 == len(bytecode.bindings.keys())
assert 1 == len(bytecode.source_instructions)
assert "withSack" == bytecode.source_instructions[0][0]
assert 1 == bytecode.source_instructions[0][1]
assert 3 == len(bytecode.step_instructions)
assert "E" == bytecode.step_instructions[0][0]
assert "groupCount" == bytecode.step_instructions[1][0]
assert "by" == bytecode.step_instructions[2][0]
assert "weight" == bytecode.step_instructions[2][1]
assert 1 == len(bytecode.step_instructions[0])
assert 1 == len(bytecode.step_instructions[1])
assert 2 == len(bytecode.step_instructions[2])
##
bytecode = g.V(Bindings.of('a', [1, 2, 3])) \
.out(Bindings.of('b', 'created')) \
.where(__.in_(Bindings.of('c', 'created'), Bindings.of('d', 'knows')) \
.count().is_(Bindings.of('e', P.gt(2)))).bytecode
assert 5 == len(bytecode.bindings.keys())
assert [1, 2, 3] == bytecode.bindings['a']
assert 'created' == bytecode.bindings['b']
assert 'created' == bytecode.bindings['c']
assert 'knows' == bytecode.bindings['d']
assert P.gt(2) == bytecode.bindings['e']
assert Binding('b', 'created') == bytecode.step_instructions[1][1]
assert 'binding[b=created]' == str(bytecode.step_instructions[1][1])
assert isinstance(hash(bytecode.step_instructions[1][1]), int)
def test_P(self):
# verify that the order of operations is respected
assert "and(eq(a),lt(b))" == str(P.eq("a").and_(P.lt("b")))
assert "and(or(lt(b),gt(c)),neq(d))" == str(P.lt("b").or_(P.gt("c")).and_(P.neq("d")))
assert "and(or(lt(b),gt(c)),or(neq(d),gte(e)))" == str(
P.lt("b").or_(P.gt("c")).and_(P.neq("d").or_(P.gte("e"))))
def test_anonymous_traversal(self):
bytecode = __.__(1).bytecode
assert 0 == len(bytecode.bindings.keys())
assert 0 == len(bytecode.source_instructions)
assert 1 == len(bytecode.step_instructions)
assert "inject" == bytecode.step_instructions[0][0]
assert 1 == bytecode.step_instructions[0][1]
##
bytecode = __.start().bytecode
assert 0 == len(bytecode.bindings.keys())
assert 0 == len(bytecode.source_instructions)
assert 0 == len(bytecode.step_instructions)
def test_clone_traversal(self):
g = traversal().withGraph(Graph())
original = g.V().out("created")
clone = original.clone().out("knows")
cloneClone = clone.clone().out("created")
assert 2 == len(original.bytecode.step_instructions)
assert 3 == len(clone.bytecode.step_instructions)
assert 4 == len(cloneClone.bytecode.step_instructions)
original.has("person", "name", "marko")
clone.V().out()
assert 3 == len(original.bytecode.step_instructions)
assert 5 == len(clone.bytecode.step_instructions)
assert 4 == len(cloneClone.bytecode.step_instructions)
def test_no_sugar_for_magic_methods(self):
g = traversal().withGraph(Graph())
t = g.V().age
assert 2 == len(t.bytecode.step_instructions)
try:
t = g.V().__len__
fail("can't do sugar with magic")
except AttributeError as err:
assert str(
err) == 'Python magic methods or keys starting with double underscore cannot be used for Gremlin sugar - prefer values(__len__)'
def test_enforce_anonymous_child_traversal(self):
g = traversal().withGraph(Graph())
g.V(0).addE("self").to(__.V(1))
try:
g.V(0).addE("self").to(g.V(1))
assert False
except TypeError:
pass
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_transaction_commit(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
start_count = g.V().count().next()
tx = g.tx()
# Except transaction to not be open until begin is called.
assert not tx.isOpen()
gtx = tx.begin()
assert tx.isOpen()
add_node_validate_transaction_state(g, gtx, start_count, start_count, [tx])
add_node_validate_transaction_state(gtx, g, start_count + 2, start_count, [tx])
# Commit the transaction, this should close it and add our transaction only vertex to the graph,
# so our vertex count outside the transaction should be 2 + the start count.
tx.commit()
assert not tx.isOpen()
assert g.V().count().next() == start_count + 2
drop_graph_check_count(g)
verify_gtx_closed(gtx)
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_transaction_rollback(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
start_count = g.V().count().next()
tx = g.tx()
# Except transaction to not be open until begin is called.
assert not tx.isOpen()
gtx = tx.begin()
assert tx.isOpen()
add_node_validate_transaction_state(g, gtx, start_count, start_count, [tx])
add_node_validate_transaction_state(gtx, g, start_count + 2, start_count, [tx])
# Commit the transaction, this should close it and not add our transaction only vertex to the graph,
# so our vertex count outside the transaction should be the start count.
tx.rollback()
assert not tx.isOpen()
assert g.V().count().next() == start_count + 1
drop_graph_check_count(g)
verify_gtx_closed(gtx)
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_transaction_no_begin(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
tx = g.tx()
# Except transaction to not be open until begin is called.
assert not tx.isOpen()
try:
# Attempt to commit even though no tx is started.
tx().commit()
assert False
except Exception as e:
assert not tx.isOpen()
try:
# Attempt to rollback even though no tx is started.
tx().rollback()
assert False
except Exception as e:
assert not tx.isOpen()
try:
# Attempt to invoke tx().tx() which is illegal.
tx().tx()
assert False
except Exception as e:
assert not tx.isOpen()
# Create transaction and verify it is open.
gtx = tx.begin()
assert tx.isOpen()
try:
# Attempt to begin gtx which is illegal.
gtx().begin()
assert False
except Exception as e:
assert tx.isOpen()
# Commit this unused transaction and verify it is no longer open.
tx.commit()
assert not tx.isOpen()
# Create another transaction and verify it is open.
gtx = tx.begin()
assert tx.isOpen()
# Rollback this unused transaction and verify it is no longer open.
tx.rollback()
assert not tx.isOpen()
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_multi_commit_transaction(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
start_count = g.V().count().next()
# Create two transactions.
tx1, tx2 = g.tx(), g.tx()
# Generate two GraphTraversalSource's for each transaction with begin.
gtx1, gtx2 = tx1.begin(), tx2.begin()
verify_tx_state([tx1, tx2], True)
# Add node to gtx1, which should be visible to gtx1, not gtx2.
add_node_validate_transaction_state(g, gtx1, start_count, start_count, [tx1, tx2])
# Add node to gtx2, which should be visible to gtx2, not gtx2
add_node_validate_transaction_state(g, gtx2, start_count, start_count, [tx1, tx2])
# Add node to gtx1, which should be visible to gtx1, not gtx2. Note previous node also added.
add_node_validate_transaction_state(g, gtx1, start_count, start_count + 1, [tx1, tx2])
tx1.commit()
verify_tx_state([tx1], False)
verify_tx_state([tx2], True)
assert g.V().count().next() == start_count + 2
tx2.commit()
verify_tx_state([tx1, tx2], False)
assert g.V().count().next() == start_count + 3
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_multi_rollback_transaction(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
start_count = g.V().count().next()
# Create two transactions.
tx1, tx2 = g.tx(), g.tx()
# Generate two GraphTraversalSource's for each transaction with begin.
gtx1, gtx2 = tx1.begin(), tx2.begin()
verify_tx_state([tx1, tx2], True)
# Add node to gtx1, which should be visible to gtx1, not gtx2.
add_node_validate_transaction_state(g, gtx1, start_count, start_count, [tx1, tx2])
# Add node to gtx2, which should be visible to gtx2, not gtx2
add_node_validate_transaction_state(g, gtx2, start_count, start_count, [tx1, tx2])
# Add node to gtx1, which should be visible to gtx1, not gtx2. Note previous node also added.
add_node_validate_transaction_state(g, gtx1, start_count, start_count + 1, [tx1, tx2])
tx1.rollback()
verify_tx_state([tx1], False)
verify_tx_state([tx2], True)
assert g.V().count().next() == start_count
tx2.rollback()
verify_tx_state([tx1, tx2], False)
assert g.V().count().next() == start_count
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_multi_commit_and_rollback(self, remote_transaction_connection):
# Start a transaction traversal.
g = traversal().withRemote(remote_transaction_connection)
start_count = g.V().count().next()
# Create two transactions.
tx1, tx2 = g.tx(), g.tx()
# Generate two GraphTraversalSource's for each transaction with begin.
gtx1, gtx2 = tx1.begin(), tx2.begin()
verify_tx_state([tx1, tx2], True)
# Add node to gtx1, which should be visible to gtx1, not gtx2.
add_node_validate_transaction_state(g, gtx1, start_count, start_count, [tx1, tx2])
# Add node to gtx2, which should be visible to gtx2, not gtx2
add_node_validate_transaction_state(g, gtx2, start_count, start_count, [tx1, tx2])
# Add node to gtx1, which should be visible to gtx1, not gtx2. Note previous node also added.
add_node_validate_transaction_state(g, gtx1, start_count, start_count + 1, [tx1, tx2])
tx1.commit()
verify_tx_state([tx1], False)
verify_tx_state([tx2], True)
assert g.V().count().next() == start_count + 2
tx2.rollback()
verify_tx_state([tx1, tx2], False)
assert g.V().count().next() == start_count + 2
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_transaction_close_tx(self):
remote_conn = create_connection_to_gtx()
g = traversal().withRemote(remote_conn)
drop_graph_check_count(g)
tx1 = g.tx()
tx2 = g.tx()
# open up two sessions and create stuff
gtx1 = tx1.begin()
gtx2 = tx2.begin()
add_node_validate_transaction_state(g, gtx1, 0, 0, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 0, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 1, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 2, [tx1, tx2])
# someone gets lazy and doesn't commit/rollback and just calls close() - the graph
# will decide how to treat the transaction, but for neo4j/gremlin server in this
# test configuration it should rollback
tx1.close()
tx2.close()
assert not tx1.isOpen()
assert not tx2.isOpen()
verify_gtx_closed(gtx1)
verify_gtx_closed(gtx2)
remote_conn = create_connection_to_gtx()
g = traversal().withRemote(remote_conn)
assert g.V().count().next() == 0
drop_graph_check_count(g)
@pytest.mark.skipif(transactions_disabled(), reason="Transactions are not enabled.")
def test_transaction_close_tx_from_parent(self):
remote_conn = create_connection_to_gtx()
g = traversal().withRemote(remote_conn)
drop_graph_check_count(g)
tx1 = g.tx()
tx2 = g.tx()
# open up two sessions and create stuff
gtx1 = tx1.begin()
gtx2 = tx2.begin()
add_node_validate_transaction_state(g, gtx1, 0, 0, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 0, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 1, [tx1, tx2])
add_node_validate_transaction_state(g, gtx2, 0, 2, [tx1, tx2])
# someone gets lazy and doesn't commit/rollback and just calls close() but on the parent
# DriverRemoteConnection for all the session that were created via tx() - the graph
# will decide how to treat the transaction, but for neo4j/gremlin server in this
# test configuration it should rollback
remote_conn.close()
assert not tx1.isOpen()
assert not tx2.isOpen()
verify_gtx_closed(gtx1)
verify_gtx_closed(gtx2)
remote_conn = create_connection_to_gtx()
g = traversal().withRemote(remote_conn)
assert g.V().count().next() == 0
drop_graph_check_count(g)
def create_connection_to_gtx():
return DriverRemoteConnection(anonymous_url, 'gtx',
message_serializer=serializer.GraphBinarySerializersV1())
def add_node_validate_transaction_state(g, g_add_to, g_start_count, g_add_to_start_count, tx_verify_list):
# Add a single node to g_add_to, but not g.
# Check that vertex count in g is g_start_count and vertex count in g_add_to is g_add_to_start_count + 1.
g_add_to.addV("person").property("name", "lyndon").iterate()
assert g_add_to.V().count().next() == g_add_to_start_count + 1
assert g.V().count().next() == g_start_count
verify_tx_state(tx_verify_list, True)
def verify_tx_state(gtx_list, value):
for tx in gtx_list:
assert tx.isOpen() == value
def drop_graph_check_count(g):
g.V().drop().iterate()
assert g.V().count().next() == 0
def verify_gtx_closed(gtx):
try:
# Attempt to add an additional vertex to the transaction. This should throw an exception since it
# has been rolled back.
gtx().addV("failure").iterate()
assert False
except Exception as e:
pass