| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.impexp; |
| |
| |
| import org.apache.atlas.TestModules; |
| import org.apache.atlas.TestUtilsV2; |
| import org.apache.atlas.exception.AtlasBaseException; |
| import org.apache.atlas.model.impexp.AtlasExportRequest; |
| import org.apache.atlas.model.impexp.AtlasExportResult; |
| import org.apache.atlas.model.instance.AtlasEntity; |
| import org.apache.atlas.model.instance.AtlasObjectId; |
| import org.apache.atlas.model.typedef.AtlasTypesDef; |
| import org.apache.atlas.repository.graph.AtlasGraphProvider; |
| import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; |
| import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier; |
| import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; |
| import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; |
| import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; |
| import org.apache.atlas.repository.store.graph.v1.EntityGraphMapper; |
| import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; |
| import org.apache.atlas.store.AtlasTypeDefStore; |
| import org.apache.atlas.type.AtlasTypeRegistry; |
| import org.powermock.reflect.Whitebox; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Guice; |
| import org.testng.annotations.Test; |
| import scala.actors.threadpool.Arrays; |
| |
| import javax.inject.Inject; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.mockito.Mockito.mock; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| |
| @Guice(modules = TestModules.TestOnlyModule.class) |
| public class ExportServiceTest { |
| private static final Logger LOG = LoggerFactory.getLogger(ExportServiceTest.class); |
| |
| @Inject |
| AtlasTypeRegistry typeRegistry; |
| |
| @Inject |
| private AtlasTypeDefStore typeDefStore; |
| |
| @Inject |
| private EntityGraphMapper graphMapper; |
| @Inject |
| ExportService exportService; |
| private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);; |
| private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); |
| private AtlasEntityStoreV1 entityStore; |
| |
| @BeforeClass |
| public void setupSampleData() throws AtlasBaseException { |
| entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier, graphMapper);; |
| |
| AtlasTypesDef sampleTypes = TestUtilsV2.defineDeptEmployeeTypes(); |
| AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(sampleTypes, typeRegistry); |
| |
| if (!typesToCreate.isEmpty()) { |
| typeDefStore.createTypesDef(typesToCreate); |
| } |
| |
| AtlasEntity.AtlasEntitiesWithExtInfo hrDept = TestUtilsV2.createDeptEg2(); |
| |
| AtlasEntityStream entityStream = new AtlasEntityStream(hrDept); |
| entityStore.createOrUpdate(entityStream, false); |
| LOG.debug("==> setupSampleData: ", AtlasEntity.dumpObjects(hrDept.getEntities(), null).toString()); |
| } |
| |
| @AfterClass |
| public void clear() { |
| AtlasGraphProvider.cleanup(); |
| } |
| |
| private AtlasExportRequest getRequestForFullFetch() { |
| AtlasExportRequest request = new AtlasExportRequest(); |
| |
| List<AtlasObjectId> itemsToExport = new ArrayList<>(); |
| itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default@cl1")); |
| request.setItemsToExport(itemsToExport); |
| |
| return request; |
| } |
| |
| private AtlasExportRequest getRequestForDept(boolean addFetchType, String fetchTypeValue, boolean addMatchType, String matchTypeValue) { |
| AtlasExportRequest request = new AtlasExportRequest(); |
| |
| List<AtlasObjectId> itemsToExport = new ArrayList<>(); |
| itemsToExport.add(new AtlasObjectId("Department", "name", "hr")); |
| request.setItemsToExport(itemsToExport); |
| |
| setOptionsMap(request, addFetchType, fetchTypeValue, addMatchType, matchTypeValue); |
| return request; |
| } |
| |
| private AtlasExportRequest getRequestForEmployee() { |
| AtlasExportRequest request = new AtlasExportRequest(); |
| |
| List<AtlasObjectId> itemsToExport = new ArrayList<>(); |
| itemsToExport.add(new AtlasObjectId("Employee", "name", "Max")); |
| request.setItemsToExport(itemsToExport); |
| |
| setOptionsMap(request, true, "CONNECTED", false, ""); |
| return request; |
| } |
| |
| private void setOptionsMap(AtlasExportRequest request, |
| boolean addFetchType, String fetchTypeValue, boolean addMatchType, String matchTypeValue) { |
| Map<String, Object> optionsMap = null; |
| if(addFetchType) { |
| if(optionsMap == null) { |
| optionsMap = new HashMap<>(); |
| } |
| |
| optionsMap.put("fetchType", fetchTypeValue); |
| request.setOptions(optionsMap); |
| } |
| |
| if(addMatchType) { |
| if(optionsMap == null) { |
| optionsMap = new HashMap<>(); |
| } |
| |
| optionsMap.put("matchType", matchTypeValue); |
| } |
| |
| if(optionsMap != null) { |
| request.setOptions(optionsMap); |
| } |
| } |
| |
| private ZipSource runExportWithParameters(AtlasExportRequest request) throws AtlasBaseException, IOException { |
| final String requestingIP = "1.0.0.0"; |
| final String hostName = "localhost"; |
| final String userName = "admin"; |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ZipSink zipSink = new ZipSink(baos); |
| AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP); |
| |
| zipSink.close(); |
| |
| ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray()); |
| ZipSource zipSource = new ZipSource(bis); |
| return zipSource; |
| } |
| |
| @Test |
| public void exportType_Succeeds() throws AtlasBaseException, FileNotFoundException { |
| String requestingIP = "1.0.0.0"; |
| String hostName = "root"; |
| |
| AtlasExportRequest request = getRequestForFullFetch(); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ZipSink zipSink = new ZipSink(baos); |
| AtlasExportResult result = exportService.run(zipSink, request, "admin", hostName, requestingIP); |
| |
| assertNotNull(exportService); |
| assertEquals(result.getHostName(), hostName); |
| assertEquals(result.getClientIpAddress(), requestingIP); |
| assertEquals(request, result.getRequest()); |
| } |
| |
| @Test |
| public void requestingEntityNotFound_NoData() throws AtlasBaseException, IOException { |
| String requestingIP = "1.0.0.0"; |
| String hostName = "root"; |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ZipSink zipSink = new ZipSink(baos); |
| AtlasExportResult result = exportService.run( |
| zipSink, getRequestForFullFetch(), "admin", hostName, requestingIP); |
| |
| Assert.assertNull(result.getData()); |
| |
| ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); |
| ZipSource zipSource = new ZipSource(bais); |
| |
| assertNotNull(exportService); |
| assertNotNull(zipSource.getCreationOrder()); |
| Assert.assertFalse(zipSource.hasNext()); |
| } |
| |
| @Test |
| public void requestingEntityFoundDefaultFetch_ContainsData() throws Exception { |
| ZipSource source = runExportWithParameters( |
| getRequestForDept(false, "", false, "")); |
| verifyExportForHrData(source); |
| } |
| |
| @Test |
| public void requestingHrEntityWithMatchTypeContains_ContainsData() throws Exception { |
| ZipSource source = runExportWithParameters( |
| getRequestForDept(false, "", true, "CONTAINS")); |
| verifyExportForHrData(source); |
| } |
| |
| @Test |
| public void requestingHrEntityWithMatchTypeEndsWith_ContainsData() throws Exception { |
| ZipSource source = runExportWithParameters( |
| getRequestForDept(false, "", true, "ENDSWITH")); |
| verifyExportForHrData(source); |
| } |
| |
| @Test |
| public void requestingDeptEntityFoundFullFetch_ContainsData() throws Exception { |
| ZipSource source = runExportWithParameters( |
| getRequestForDept(true, "FULL", false, "")); |
| verifyExportForHrData(source); |
| } |
| |
| @Test |
| public void requestingDeptEntityFoundConnectedFetch_ContainsData() throws Exception { |
| ZipSource source = runExportWithParameters( |
| getRequestForDept(true, "CONNECTED", false, "")); |
| verifyExportForHrDataForConnected(source); |
| } |
| |
| @Test |
| public void requestingEmployeeEntityFoundConnectedFetch_ContainsData() throws Exception { |
| ZipSource zipSource = runExportWithParameters(getRequestForEmployee()); |
| verifyExportForEmployeeData(zipSource); |
| } |
| |
| @Test |
| public void verifyOverallStatus() throws Exception { |
| |
| // ExportService service = new ExportService(typeRegistry); |
| assertEquals(AtlasExportResult.OperationStatus.FAIL, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus")); |
| |
| assertEquals(AtlasExportResult.OperationStatus.SUCCESS, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus", |
| AtlasExportResult.OperationStatus.SUCCESS)); |
| |
| assertEquals(AtlasExportResult.OperationStatus.SUCCESS, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus", |
| AtlasExportResult.OperationStatus.SUCCESS, |
| AtlasExportResult.OperationStatus.SUCCESS, |
| AtlasExportResult.OperationStatus.SUCCESS)); |
| |
| assertEquals(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus", |
| AtlasExportResult.OperationStatus.FAIL, |
| AtlasExportResult.OperationStatus.PARTIAL_SUCCESS, |
| AtlasExportResult.OperationStatus.SUCCESS)); |
| |
| assertEquals(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus", |
| AtlasExportResult.OperationStatus.FAIL, |
| AtlasExportResult.OperationStatus.FAIL, |
| AtlasExportResult.OperationStatus.PARTIAL_SUCCESS)); |
| |
| assertEquals(AtlasExportResult.OperationStatus.FAIL, Whitebox.invokeMethod(exportService, |
| "getOverallOperationStatus", |
| AtlasExportResult.OperationStatus.FAIL, |
| AtlasExportResult.OperationStatus.FAIL, |
| AtlasExportResult.OperationStatus.FAIL)); |
| |
| |
| } |
| |
| @Test |
| public void requestingExportOfNonExistentEntity_ReturnsFailure() throws Exception { |
| AtlasExportRequest request = getRequestForEmployee(); |
| tamperEmployeeRequest(request); |
| ZipSource zipSource = runExportWithParameters(request); |
| |
| assertNotNull(zipSource.getCreationOrder()); |
| assertEquals(zipSource.getCreationOrder().size(), 0); |
| assertEquals(AtlasExportResult.OperationStatus.FAIL, zipSource.getExportResult().getOperationStatus()); |
| } |
| |
| private void tamperEmployeeRequest(AtlasExportRequest request) { |
| AtlasObjectId objectId = request.getItemsToExport().get(0); |
| objectId.getUniqueAttributes().remove("name"); |
| objectId.getUniqueAttributes().put("qualifiedName", "XXX@121"); |
| } |
| |
| private void verifyExportForEmployeeData(ZipSource zipSource) throws AtlasBaseException { |
| final List<String> expectedEntityTypes = Arrays.asList(new String[]{"Manager", "Employee", "Department"}); |
| |
| assertNotNull(zipSource.getCreationOrder()); |
| assertEquals(zipSource.getCreationOrder().size(), 2); |
| assertTrue(zipSource.hasNext()); |
| |
| while (zipSource.hasNext()) { |
| AtlasEntity entity = zipSource.next(); |
| |
| assertNotNull(entity); |
| assertEquals(AtlasEntity.Status.ACTIVE, entity.getStatus()); |
| assertTrue(expectedEntityTypes.contains(entity.getTypeName())); |
| } |
| |
| verifyTypeDefs(zipSource); |
| } |
| |
| private void verifyExportForHrData(ZipSource zipSource) throws IOException, AtlasBaseException { |
| assertNotNull(zipSource.getCreationOrder()); |
| assertTrue(zipSource.getCreationOrder().size() == 1); |
| assertTrue(zipSource.hasNext()); |
| |
| AtlasEntity entity = zipSource.next(); |
| |
| assertNotNull(entity); |
| assertTrue(entity.getTypeName().equals("Department")); |
| assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); |
| verifyTypeDefs(zipSource); |
| } |
| |
| private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException { |
| assertNotNull(zipSource.getCreationOrder()); |
| assertTrue(zipSource.getCreationOrder().size() == 2); |
| assertTrue(zipSource.hasNext()); |
| |
| AtlasEntity entity = zipSource.next(); |
| |
| assertNotNull(entity); |
| assertTrue(entity.getTypeName().equals("Department")); |
| assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); |
| verifyTypeDefs(zipSource); |
| } |
| |
| private void verifyTypeDefs(ZipSource zipSource) throws AtlasBaseException { |
| assertEquals(zipSource.getTypesDef().getEnumDefs().size(), 1); |
| assertEquals(zipSource.getTypesDef().getClassificationDefs().size(), 0); |
| assertEquals(zipSource.getTypesDef().getStructDefs().size(), 1); |
| assertEquals(zipSource.getTypesDef().getEntityDefs().size(), 4); |
| } |
| } |