blob: e62aeeacff14aa86e9a178d2f04634c444418ca8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from age.models import Vertex
import unittest
import decimal
import age
import argparse
DSN = "host=localhost port=5432 dbname=postgres user=dehowefeng password=agens"
TEST_HOST = "localhost"
TEST_PORT = 5432
TEST_DB = "postgres"
TEST_USER = "postgres"
TEST_PASSWORD = "agens"
TEST_GRAPH_NAME = "test_graph"
class TestAgeBasic(unittest.TestCase):
ag = None
def setUp(self):
print("Connecting to Test Graph.....")
self.ag = age.connect(graph=TEST_GRAPH_NAME, host=TEST_HOST, port=TEST_PORT, dbname=TEST_DB, user=TEST_USER, password=TEST_PASSWORD)
def tearDown(self):
# Clear test data
print("Deleting Test Graph.....")
age.deleteGraph(self.ag.connection, self.ag.graphName)
self.ag.close()
def testExec(self):
print("\n---------------------------------------------------")
print("Test 1: Checking single and multi column Returns.....")
print("---------------------------------------------------\n")
ag = self.ag
# Create and Return single column
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: 'Developer'}) RETURN n", params=('Andy',))
for row in cursor:
print("Vertex: %s , Type: %s " % (Vertex, type(row[0])))
# Create and Return multi columns
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: %s}) RETURN id(n), n.name", cols=['id','name'], params=('Jack','Manager'))
row = cursor.fetchone()
print("Id: %s , Name: %s" % (row[0], row[1]))
self.assertEqual(int, type(row[0]))
ag.commit()
print("\nTest 1 Successful....")
def testQuery(self):
print("\n--------------------------------------------------")
print("Test 2: Testing CREATE and query relationships....." )
print("--------------------------------------------------\n")
ag = self.ag
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Jack',))
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Andy',))
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Smith',))
ag.execCypher("MATCH (a:Person), (b:Person) WHERE a.name = 'Andy' AND b.name = 'Jack' CREATE (a)-[r:worksWith {weight: 3}]->(b)")
ag.execCypher("""MATCH (a:Person), (b:Person)
WHERE a.name = %s AND b.name = %s
CREATE p=((a)-[r:worksWith]->(b)) """, params=('Jack', 'Smith',))
ag.commit()
cursor = ag.execCypher("MATCH p=()-[:worksWith]-() RETURN p")
for row in cursor:
path = row[0]
print("START:", path[0])
print("EDGE:", path[1])
print("END:", path[2])
cursor = ag.execCypher("MATCH p=(a)-[b]-(c) WHERE b.weight>2 RETURN a,label(b), b.weight, c", cols=["a","bl","bw", "c"], params=(2,))
for row in cursor:
start = row[0]
edgel = row[1]
edgew = row[2]
end = row[3]
print("Relationship: %s %s %s. Edge weight: %s" % (start["name"] , edgel,end["name"], edgew))
#Assert that the weight of the edge is greater than 2
self.assertEqual(edgew > 2, True)
print("\nTest 2 Successful...")
def testChangeData(self):
print("\n-------------------------------------------------------")
print("Test 3: Testing changes in data using SET and REMOVE.....")
print("-------------------------------------------------------\n")
ag = self.ag
# Create Vertices
# Commit automatically
ag.execCypher("CREATE (n:Person {name: 'Joe'})")
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: 'Developer'}) RETURN n", params=('Smith',))
row = cursor.fetchone()
print("CREATED: ", row[0])
# You must commit explicitly
ag.commit()
cursor = ag.execCypher("MATCH (n:Person {name: %s}) SET n.title=%s RETURN n", params=('Smith','Manager',))
row = cursor.fetchone()
vertex = row[0]
title1 = vertex["title"]
print("SET title: ", title1)
ag.commit()
cursor = ag.execCypher("MATCH (p:Person {name: 'Smith'}) RETURN p.title")
row = cursor.fetchone()
title2 = row[0]
self.assertEqual(title1, title2)
cursor = ag.execCypher("MATCH (n:Person {name: %s}) SET n.bigNum=-6.45161e+46::numeric RETURN n", params=('Smith',))
row = cursor.fetchone()
vertex = row[0]
for row in cursor:
print("SET bigNum: ", vertex['bigNum'])
bigNum1 = vertex["bigNum"]
self.assertEqual(decimal.Decimal("-6.45161e+46"), bigNum1)
ag.commit()
cursor = ag.execCypher("MATCH (p:Person {name: 'Smith'}) RETURN p.bigNum")
row = cursor.fetchone()
bigNum2 = row[0]
self.assertEqual(bigNum1, bigNum2)
cursor = ag.execCypher("MATCH (n:Person {name: %s}) REMOVE n.title RETURN n", params=('Smith',))
for row in cursor:
print("REMOVE Prop title: ", row[0])
#Assert that the title property is removed
self.assertIsNone(row[0].properties.get('title'))
print("\nTest 3 Successful....")
# You must commit explicitly
ag.commit()
def testCypher(self):
print("\n--------------------------")
print("Test 4: Testing Cypher.....")
print("--------------------------\n")
ag = self.ag
with ag.connection.cursor() as cursor:
try :
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Joe',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Jack',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Andy',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Smith',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Tom',))
# You must commit explicitly
ag.commit()
except Exception as ex:
print(ex)
ag.rollback()
with ag.connection.cursor() as cursor:
try :# Create Edges
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Smith' CREATE (a)-[r:worksWith {weight: 3}]->(b)")
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Andy' AND b.name = 'Tom' CREATE (a)-[r:worksWith {weight: 1}]->(b)")
ag.cypher(cursor,"MATCH (a:Person {name: 'Jack'}), (b:Person {name: 'Andy'}) CREATE (a)-[r:worksWith {weight: 5}]->(b)")
# You must commit explicitly
ag.commit()
except Exception as ex:
print(ex)
ag.rollback()
# With Params
cursor = ag.execCypher("""MATCH (a:Person), (b:Person)
WHERE a.name = %s AND b.name = %s
CREATE p=((a)-[r:worksWith]->(b)) RETURN p""",
params=('Andy', 'Smith',))
for row in cursor:
print("CREATED EDGE: %s" % row[0])
cursor = ag.execCypher("""MATCH (a:Person {name: 'Joe'}), (b:Person {name: 'Jack'})
CREATE p=((a)-[r:worksWith {weight: 5}]->(b))
RETURN p """)
for row in cursor:
print("CREATED EDGE WITH PROPERTIES: %s" % row[0])
self.assertEqual(row[0][1].properties['weight'], 5)
print("\nTest 4 Successful...")
def testMultipleEdges(self):
print("\n------------------------------------")
print("Test 5: Testing Multiple Edges.....")
print("------------------------------------\n")
ag = self.ag
with ag.connection.cursor() as cursor:
try :
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('USA',))
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('France',))
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('Korea',))
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('Russia',))
# You must commit explicitly after all executions.
ag.connection.commit()
except Exception as ex:
ag.rollback()
raise ex
with ag.connection.cursor() as cursor:
try :# Create Edges
ag.cypher(cursor,"MATCH (a:Country), (b:Country) WHERE a.name = 'USA' AND b.name = 'France' CREATE (a)-[r:distance {unit:'miles', value: 4760}]->(b)")
ag.cypher(cursor,"MATCH (a:Country), (b:Country) WHERE a.name = 'France' AND b.name = 'Korea' CREATE (a)-[r:distance {unit: 'km', value: 9228}]->(b)")
ag.cypher(cursor,"MATCH (a:Country {name: 'Korea'}), (b:Country {name: 'Russia'}) CREATE (a)-[r:distance {unit:'km', value: 3078}]->(b)")
# You must commit explicitly
ag.connection.commit()
except Exception as ex:
ag.rollback()
raise ex
cursor = ag.execCypher("""MATCH p=(:Country {name:"USA"})-[:distance]-(:Country)-[:distance]-(:Country)
RETURN p""")
count = 0
output = []
for row in cursor:
path = row[0]
for e in path:
if e.gtype == age.TP_VERTEX:
output.append(e.label + " " + e["name"])
elif e.gtype == age.TP_EDGE:
output.append("---- (distance " + str(e["value"]) + " " + e["unit"] + ") --->")
else:
output.append("Unknown element. " + str(e))
count += 1
formatted_output = " ".join(output)
print("PATH WITH MULTIPLE EDGES: %s" % formatted_output)
self.assertEqual(5,count)
print("\nTest 5 Successful...")
def testCollect(self):
print("\n--------------------------")
print("Test 6: Testing COLLECT.....")
print("--------------------------\n")
ag = self.ag
with ag.connection.cursor() as cursor:
try :
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Joe',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Jack',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Andy',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Smith',))
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Tom',))
# You must commit explicitly
ag.commit()
except Exception as ex:
print(ex)
ag.rollback()
with ag.connection.cursor() as cursor:
try :# Create Edges
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Smith' CREATE (a)-[r:worksWith {weight: 3}]->(b)")
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Tom' CREATE (a)-[r:worksWith {weight: 1}]->(b)")
ag.cypher(cursor,"MATCH (a:Person {name: 'Joe'}), (b:Person {name: 'Andy'}) CREATE (a)-[r:worksWith {weight: 5}]->(b)")
# You must commit explicitly
ag.commit()
except Exception as ex:
print(ex)
ag.rollback()
print(" -------- TESTING COLLECT #1 --------")
with ag.connection.cursor() as cursor:
ag.cypher(cursor, "MATCH (a)-[:worksWith]->(c) WITH a as V, COLLECT(c) as CV RETURN V.name, CV", cols=["V","CV"])
for row in cursor:
nm = row[0]
collected = row[1]
print(nm, "worksWith", [i["name"] for i in collected])
self.assertEqual(3,len(collected))
print(" -------- TESTING COLLECT #2 --------")
for row in ag.execCypher("MATCH (a)-[:worksWith]->(c) WITH a as V, COLLECT(c) as CV RETURN V.name, CV", cols=["V1","CV"]):
nm = row[0]
collected = row[1]
print(nm, "worksWith", [i["name"] for i in collected])
self.assertEqual(3,len(collected))
print("\nTest 6 Successful...")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-host',
'--host',
help='Optional Host Name. Default Host is "127.0.0.1" ',
default="127.0.0.1")
parser.add_argument('-port',
'--port',
help='Optional Port Number. Default port no is 5432',
default=5432)
parser.add_argument('-db',
'--database',
help='Required Database Name',
required=True)
parser.add_argument('-u',
'--user',
help='Required Username Name',
required=True)
parser.add_argument('-pass',
'--password',
help='Required Password for authentication',
required=True)
parser.add_argument('-gn',
'--graphName',
help='Optional Graph Name to be created. Default graphName is "test_graph"',
default="test_graph")
args = parser.parse_args()
suite = unittest.TestSuite()
suite.addTest(TestAgeBasic('testExec'))
suite.addTest(TestAgeBasic('testQuery'))
suite.addTest(TestAgeBasic('testChangeData'))
suite.addTest(TestAgeBasic('testCypher'))
suite.addTest(TestAgeBasic('testMultipleEdges'))
suite.addTest(TestAgeBasic('testCollect'))
TestAgeBasic.args = args
unittest.TextTestRunner().run(suite)