Python library for Apache Atlas.
Use the package manager pip to install Python client for Apache Atlas.
> pip install apache-atlas
Verify if apache-atlas client is installed:
> pip list Package Version ------------ --------- apache-atlas 0.0.11
python atlas_example.py
# atlas_example.py import time from apache_atlas.client.base_client import AtlasClient from apache_atlas.model.instance import AtlasEntity, AtlasEntityWithExtInfo, AtlasEntitiesWithExtInfo, AtlasRelatedObjectId from apache_atlas.model.enums import EntityOperation ## Step 1: create a client to connect to Apache Atlas server client = AtlasClient('http://localhost:21000', ('admin', 'atlasR0cks!')) # For Kerberos authentication, use HTTPKerberosAuth as shown below # # from requests_kerberos import HTTPKerberosAuth # # client = AtlasClient('http://localhost:21000', HTTPKerberosAuth()) # to disable SSL certificate validation (not recommended for production use!) # # client.session.verify = False ## Step 2: Let's create a database entity test_db = AtlasEntity({ 'typeName': 'hive_db' }) test_db.attributes = { 'name': 'test_db', 'clusterName': 'prod', 'qualifiedName': 'test_db@prod' } entity_info = AtlasEntityWithExtInfo() entity_info.entity = test_db print('Creating test_db') resp = client.entity.create_entity(entity_info) guid_db = resp.get_assigned_guid(test_db.guid) print(' created test_db: guid=' + guid_db) ## Step 3: Let's create a table entity, and two column entities - in one call test_tbl = AtlasEntity({ 'typeName': 'hive_table' }) test_tbl.attributes = { 'name': 'test_tbl', 'qualifiedName': 'test_db.test_tbl@prod' } test_tbl.relationshipAttributes = { 'db': AtlasRelatedObjectId({ 'guid': guid_db }) } test_col1 = AtlasEntity({ 'typeName': 'hive_column' }) test_col1.attributes = { 'name': 'test_col1', 'type': 'string', 'qualifiedName': 'test_db.test_tbl.test_col1@prod' } test_col1.relationshipAttributes = { 'table': AtlasRelatedObjectId({ 'guid': test_tbl.guid }) } test_col2 = AtlasEntity({ 'typeName': 'hive_column' }) test_col2.attributes = { 'name': 'test_col2', 'type': 'string', 'qualifiedName': 'test_db.test_tbl.test_col2@prod' } test_col2.relationshipAttributes = { 'table': AtlasRelatedObjectId({ 'guid': test_tbl.guid }) } entities_info = AtlasEntitiesWithExtInfo() entities_info.entities = [ test_tbl, test_col1, test_col2 ] print('Creating test_tbl') resp = client.entity.create_entities(entities_info) guid_tbl = resp.get_assigned_guid(test_tbl.guid) guid_col1 = resp.get_assigned_guid(test_col1.guid) guid_col2 = resp.get_assigned_guid(test_col2.guid) print(' created test_tbl: guid=' + guid_tbl) print(' created test_tbl.test_col1: guid=' + guid_col1) print(' created test_tbl.test_col2: guid=' + guid_col2) ## Step 4: Let's create a view entity that feeds from the table created earlier # Also create a lineage between the table and the view, and lineages between their columns as well test_view = AtlasEntity({ 'typeName': 'hive_table' }) test_view.attributes = { 'name': 'test_view', 'qualifiedName': 'test_db.test_view@prod' } test_view.relationshipAttributes = { 'db': AtlasRelatedObjectId({ 'guid': guid_db }) } test_view_col1 = AtlasEntity({ 'typeName': 'hive_column' }) test_view_col1.attributes = { 'name': 'test_col1', 'type': 'string', 'qualifiedName': 'test_db.test_view.test_col1@prod' } test_view_col1.relationshipAttributes = { 'table': AtlasRelatedObjectId({ 'guid': test_view.guid }) } test_view_col2 = AtlasEntity({ 'typeName': 'hive_column' }) test_view_col2.attributes = { 'name': 'test_col2', 'type': 'string', 'qualifiedName': 'test_db.test_view.test_col2@prod' } test_view_col2.relationshipAttributes = { 'table': AtlasRelatedObjectId({ 'guid': test_view.guid }) } test_process = AtlasEntity({ 'typeName': 'hive_process' }) test_process.attributes = { 'name': 'create_test_view', 'userName': 'admin', 'operationType': 'CREATE', 'qualifiedName': 'create_test_view@prod' } test_process.attributes['queryText'] = 'create view test_view as select * from test_tbl' test_process.attributes['queryPlan'] = '<queryPlan>' test_process.attributes['queryId'] = '<queryId>' test_process.attributes['startTime'] = int(time.time() * 1000) test_process.attributes['endTime'] = int(time.time() * 1000) test_process.relationshipAttributes = { 'inputs': [ AtlasRelatedObjectId({ 'guid': guid_tbl }) ], 'outputs': [ AtlasRelatedObjectId({ 'guid': test_view.guid }) ] } test_col1_lineage = AtlasEntity({ 'typeName': 'hive_column_lineage' }) test_col1_lineage.attributes = { 'name': 'test_view.test_col1 lineage', 'depenendencyType': 'read', 'qualifiedName': 'test_db.test_view.test_col1@prod' } test_col1_lineage.attributes['query'] = { 'guid': test_process.guid } test_col1_lineage.relationshipAttributes = { 'inputs': [ AtlasRelatedObjectId({ 'guid': guid_col1 }) ], 'outputs': [ AtlasRelatedObjectId({ 'guid': test_view_col1.guid }) ] } test_col2_lineage = AtlasEntity({ 'typeName': 'hive_column_lineage' }) test_col2_lineage.attributes = { 'name': 'test_view.test_col2 lineage', 'depenendencyType': 'read', 'qualifiedName': 'test_db.test_view.test_col2@prod' } test_col2_lineage.attributes['query'] = { 'guid': test_process.guid } test_col2_lineage.relationshipAttributes = { 'inputs': [ AtlasRelatedObjectId({ 'guid': guid_col2 }) ], 'outputs': [ AtlasRelatedObjectId({ 'guid': test_view_col2.guid }) ] } entities_info = AtlasEntitiesWithExtInfo() entities_info.entities = [ test_process, test_col1_lineage, test_col2_lineage ] entities_info.add_referenced_entity(test_view) entities_info.add_referenced_entity(test_view_col1) entities_info.add_referenced_entity(test_view_col2) print('Creating test_view') resp = client.entity.create_entities(entities_info) guid_view = resp.get_assigned_guid(test_view.guid) guid_view_col1 = resp.get_assigned_guid(test_view_col1.guid) guid_view_col2 = resp.get_assigned_guid(test_view_col2.guid) guid_process = resp.get_assigned_guid(test_process.guid) guid_col1_lineage = resp.get_assigned_guid(test_col1_lineage.guid) guid_col2_lineage = resp.get_assigned_guid(test_col2_lineage.guid) print(' created test_view: guid=' + guid_view) print(' created test_view.test_col1: guid=' + guid_view_col1) print(' created test_view.test_col2: guid=' + guid_view_col1) print(' created test_view lineage: guid=' + guid_process) print(' created test_col1 lineage: guid=' + guid_col1_lineage) print(' created test_col2 lineage: guid=' + guid_col2_lineage) ## Step 5: Finally, cleanup by deleting entities created above print('Deleting entities') resp = client.entity.delete_entities_by_guids([ guid_col1_lineage, guid_col2_lineage, guid_process, guid_view, guid_tbl, guid_db ]) deleted_count = len(resp.mutatedEntities[EntityOperation.DELETE.name]) if resp and resp.mutatedEntities and EntityOperation.DELETE.name in resp.mutatedEntities else 0 print(' ' + str(deleted_count) + ' entities deleted')
For more examples, checkout sample-app
python project in atlas-examples module.