# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
from age.models import Vertex | |
import unittest | |
import decimal | |
import age | |
import argparse | |
DSN = "host=127.0.0.1 port=5432 dbname=postgres user=postgres password=agens" | |
class TestAgeBasic(unittest.TestCase): | |
ag = None | |
def setUp(self): | |
print("Connecting to Test Graph.....") | |
TEST_DB = self.args.database | |
TEST_USER = self.args.user | |
TEST_PASSWORD = self.args.password | |
TEST_PORT = self.args.port | |
TEST_HOST = self.args.host | |
TEST_GRAPH_NAME = self.args.graphName | |
self.ag = age.connect(graph=TEST_GRAPH_NAME, host=TEST_HOST, port=TEST_PORT, dbname=TEST_DB, user=TEST_USER, password=TEST_PASSWORD) | |
def tearDown(self): | |
# Clear test data | |
print("Deleting Test Graph.....") | |
age.deleteGraph(self.ag.connection, self.ag.graphName) | |
self.ag.close() | |
def testExec(self): | |
print("\n---------------------------------------------------") | |
print("Test 1: Checking single and multi column Returns.....") | |
print("---------------------------------------------------\n") | |
ag = self.ag | |
# Create and Return single column | |
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: 'Developer'}) RETURN n", params=('Andy',)) | |
for row in cursor: | |
print("Vertex: %s , Type: %s " % (Vertex, type(row[0]))) | |
# Create and Return multi columns | |
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: %s}) RETURN id(n), n.name", cols=['id','name'], params=('Jack','Manager')) | |
row = cursor.fetchone() | |
print("Id: %s , Name: %s" % (row[0], row[1])) | |
self.assertEqual(int, type(row[0])) | |
ag.commit() | |
print("\nTest 1 Successful....") | |
def testQuery(self): | |
print("\n--------------------------------------------------") | |
print("Test 2: Testing CREATE and query relationships....." ) | |
print("--------------------------------------------------\n") | |
ag = self.ag | |
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Jack',)) | |
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Andy',)) | |
ag.execCypher("CREATE (n:Person {name: %s}) ", params=('Smith',)) | |
ag.execCypher("MATCH (a:Person), (b:Person) WHERE a.name = 'Andy' AND b.name = 'Jack' CREATE (a)-[r:worksWith {weight: 3}]->(b)") | |
ag.execCypher("""MATCH (a:Person), (b:Person) | |
WHERE a.name = %s AND b.name = %s | |
CREATE p=((a)-[r:worksWith]->(b)) """, params=('Jack', 'Smith',)) | |
ag.commit() | |
cursor = ag.execCypher("MATCH p=()-[:worksWith]-() RETURN p") | |
for row in cursor: | |
path = row[0] | |
print("START:", path[0]) | |
print("EDGE:", path[1]) | |
print("END:", path[2]) | |
cursor = ag.execCypher("MATCH p=(a)-[b]-(c) WHERE b.weight>2 RETURN a,label(b), b.weight, c", cols=["a","bl","bw", "c"], params=(2,)) | |
for row in cursor: | |
start = row[0] | |
edgel = row[1] | |
edgew = row[2] | |
end = row[3] | |
print("Relationship: %s %s %s. Edge weight: %s" % (start["name"] , edgel,end["name"], edgew)) | |
#Assert that the weight of the edge is greater than 2 | |
self.assertEqual(edgew > 2, True) | |
print("\nTest 2 Successful...") | |
def testChangeData(self): | |
print("\n-------------------------------------------------------") | |
print("Test 3: Testing changes in data using SET and REMOVE.....") | |
print("-------------------------------------------------------\n") | |
ag = self.ag | |
# Create Vertices | |
# Commit automatically | |
ag.execCypher("CREATE (n:Person {name: 'Joe'})") | |
cursor = ag.execCypher("CREATE (n:Person {name: %s, title: 'Developer'}) RETURN n", params=('Smith',)) | |
row = cursor.fetchone() | |
print("CREATED: ", row[0]) | |
# You must commit explicitly | |
ag.commit() | |
cursor = ag.execCypher("MATCH (n:Person {name: %s}) SET n.title=%s RETURN n", params=('Smith','Manager',)) | |
row = cursor.fetchone() | |
vertex = row[0] | |
title1 = vertex["title"] | |
print("SET title: ", title1) | |
ag.commit() | |
cursor = ag.execCypher("MATCH (p:Person {name: 'Smith'}) RETURN p.title") | |
row = cursor.fetchone() | |
title2 = row[0] | |
self.assertEqual(title1, title2) | |
cursor = ag.execCypher("MATCH (n:Person {name: %s}) SET n.bigNum=-6.45161e+46::numeric RETURN n", params=('Smith',)) | |
row = cursor.fetchone() | |
vertex = row[0] | |
for row in cursor: | |
print("SET bigNum: ", vertex['bigNum']) | |
bigNum1 = vertex["bigNum"] | |
self.assertEqual(decimal.Decimal("-6.45161e+46"), bigNum1) | |
ag.commit() | |
cursor = ag.execCypher("MATCH (p:Person {name: 'Smith'}) RETURN p.bigNum") | |
row = cursor.fetchone() | |
bigNum2 = row[0] | |
self.assertEqual(bigNum1, bigNum2) | |
cursor = ag.execCypher("MATCH (n:Person {name: %s}) REMOVE n.title RETURN n", params=('Smith',)) | |
for row in cursor: | |
print("REMOVE Prop title: ", row[0]) | |
#Assert that the title property is removed | |
self.assertIsNone(row[0].properties.get('title')) | |
print("\nTest 3 Successful....") | |
# You must commit explicitly | |
ag.commit() | |
def testCypher(self): | |
print("\n--------------------------") | |
print("Test 4: Testing Cypher.....") | |
print("--------------------------\n") | |
ag = self.ag | |
with ag.connection.cursor() as cursor: | |
try : | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Joe',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Jack',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Andy',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Smith',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Tom',)) | |
# You must commit explicitly | |
ag.commit() | |
except Exception as ex: | |
print(ex) | |
ag.rollback() | |
with ag.connection.cursor() as cursor: | |
try :# Create Edges | |
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Smith' CREATE (a)-[r:worksWith {weight: 3}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Andy' AND b.name = 'Tom' CREATE (a)-[r:worksWith {weight: 1}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Person {name: 'Jack'}), (b:Person {name: 'Andy'}) CREATE (a)-[r:worksWith {weight: 5}]->(b)") | |
# You must commit explicitly | |
ag.commit() | |
except Exception as ex: | |
print(ex) | |
ag.rollback() | |
# With Params | |
cursor = ag.execCypher("""MATCH (a:Person), (b:Person) | |
WHERE a.name = %s AND b.name = %s | |
CREATE p=((a)-[r:worksWith]->(b)) RETURN p""", | |
params=('Andy', 'Smith',)) | |
for row in cursor: | |
print("CREATED EDGE: %s" % row[0]) | |
cursor = ag.execCypher("""MATCH (a:Person {name: 'Joe'}), (b:Person {name: 'Jack'}) | |
CREATE p=((a)-[r:worksWith {weight: 5}]->(b)) | |
RETURN p """) | |
for row in cursor: | |
print("CREATED EDGE WITH PROPERTIES: %s" % row[0]) | |
self.assertEqual(row[0][1].properties['weight'], 5) | |
print("\nTest 4 Successful...") | |
def testMultipleEdges(self): | |
print("\n------------------------------------") | |
print("Test 5: Testing Multiple Edges.....") | |
print("------------------------------------\n") | |
ag = self.ag | |
with ag.connection.cursor() as cursor: | |
try : | |
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('USA',)) | |
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('France',)) | |
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('Korea',)) | |
ag.cypher(cursor, "CREATE (n:Country {name: %s}) ", params=('Russia',)) | |
# You must commit explicitly after all executions. | |
ag.connection.commit() | |
except Exception as ex: | |
ag.rollback() | |
raise ex | |
with ag.connection.cursor() as cursor: | |
try :# Create Edges | |
ag.cypher(cursor,"MATCH (a:Country), (b:Country) WHERE a.name = 'USA' AND b.name = 'France' CREATE (a)-[r:distance {unit:'miles', value: 4760}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Country), (b:Country) WHERE a.name = 'France' AND b.name = 'Korea' CREATE (a)-[r:distance {unit: 'km', value: 9228}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Country {name: 'Korea'}), (b:Country {name: 'Russia'}) CREATE (a)-[r:distance {unit:'km', value: 3078}]->(b)") | |
# You must commit explicitly | |
ag.connection.commit() | |
except Exception as ex: | |
ag.rollback() | |
raise ex | |
cursor = ag.execCypher("""MATCH p=(:Country {name:"USA"})-[:distance]-(:Country)-[:distance]-(:Country) | |
RETURN p""") | |
count = 0 | |
output = [] | |
for row in cursor: | |
path = row[0] | |
for e in path: | |
if e.gtype == age.TP_VERTEX: | |
output.append(e.label + " " + e["name"]) | |
elif e.gtype == age.TP_EDGE: | |
output.append("---- (distance " + str(e["value"]) + " " + e["unit"] + ") --->") | |
else: | |
output.append("Unknown element. " + str(e)) | |
count += 1 | |
formatted_output = " ".join(output) | |
print("PATH WITH MULTIPLE EDGES: %s" % formatted_output) | |
self.assertEqual(5,count) | |
print("\nTest 5 Successful...") | |
def testCollect(self): | |
print("\n--------------------------") | |
print("Test 6: Testing COLLECT.....") | |
print("--------------------------\n") | |
ag = self.ag | |
with ag.connection.cursor() as cursor: | |
try : | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Joe',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Jack',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Andy',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Smith',)) | |
ag.cypher(cursor, "CREATE (n:Person {name: %s}) ", params=('Tom',)) | |
# You must commit explicitly | |
ag.commit() | |
except Exception as ex: | |
print(ex) | |
ag.rollback() | |
with ag.connection.cursor() as cursor: | |
try :# Create Edges | |
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Smith' CREATE (a)-[r:worksWith {weight: 3}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Person), (b:Person) WHERE a.name = 'Joe' AND b.name = 'Tom' CREATE (a)-[r:worksWith {weight: 1}]->(b)") | |
ag.cypher(cursor,"MATCH (a:Person {name: 'Joe'}), (b:Person {name: 'Andy'}) CREATE (a)-[r:worksWith {weight: 5}]->(b)") | |
# You must commit explicitly | |
ag.commit() | |
except Exception as ex: | |
print(ex) | |
ag.rollback() | |
print(" -------- TESTING COLLECT #1 --------") | |
with ag.connection.cursor() as cursor: | |
ag.cypher(cursor, "MATCH (a)-[:worksWith]->(c) WITH a as V, COLLECT(c) as CV RETURN V.name, CV", cols=["V","CV"]) | |
for row in cursor: | |
nm = row[0] | |
collected = row[1] | |
print(nm, "worksWith", [i["name"] for i in collected]) | |
self.assertEqual(3,len(collected)) | |
print(" -------- TESTING COLLECT #2 --------") | |
for row in ag.execCypher("MATCH (a)-[:worksWith]->(c) WITH a as V, COLLECT(c) as CV RETURN V.name, CV", cols=["V1","CV"]): | |
nm = row[0] | |
collected = row[1] | |
print(nm, "worksWith", [i["name"] for i in collected]) | |
self.assertEqual(3,len(collected)) | |
print("\nTest 6 Successful...") | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-host', | |
'--host', | |
help='Optional Host Name. Default Host is "127.0.0.1" ', | |
default="127.0.0.1") | |
parser.add_argument('-port', | |
'--port', | |
help='Optional Port Number. Default port no is 5432', | |
default=5432) | |
parser.add_argument('-db', | |
'--database', | |
help='Required Database Name', | |
required=True) | |
parser.add_argument('-u', | |
'--user', | |
help='Required Username Name', | |
required=True) | |
parser.add_argument('-pass', | |
'--password', | |
help='Required Password for authentication', | |
required=True) | |
parser.add_argument('-gn', | |
'--graphName', | |
help='Optional Graph Name to be created. Default graphName is "test_graph"', | |
default="test_graph") | |
args = parser.parse_args() | |
suite = unittest.TestSuite() | |
suite.addTest(TestAgeBasic('testExec')) | |
suite.addTest(TestAgeBasic('testQuery')) | |
suite.addTest(TestAgeBasic('testChangeData')) | |
suite.addTest(TestAgeBasic('testCypher')) | |
suite.addTest(TestAgeBasic('testMultipleEdges')) | |
suite.addTest(TestAgeBasic('testCollect')) | |
TestAgeBasic.args = args | |
unittest.TextTestRunner().run(suite) |