blob: f8c9218c6bb335cbeaf1bc3e6ec30d8cff050a4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasStringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MigrationImport extends ImportStrategy {
private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
private final AtlasGraph graph;
private final AtlasGraphProvider graphProvider;
private final AtlasTypeRegistry typeRegistry;
public MigrationImport(AtlasGraph graph, AtlasGraphProvider graphProvider, AtlasTypeRegistry typeRegistry) {
this.graph = graph;
this.graphProvider = graphProvider;
this.typeRegistry = typeRegistry;
LOG.info("MigrationImport: Using bulkLoading...");
}
public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (entityStream == null || !entityStream.hasNext()) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
}
if (importResult.getRequest() == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
}
DataMigrationStatusService dataMigrationStatusService = createMigrationStatusService(importResult);
long index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
EntityCreationManager creationManager = createEntityCreationManager(importResult, dataMigrationStatusService);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
index = creationManager.read(entityStream);
creationManager.drain();
creationManager.extractResults();
} catch (Exception ex) {
LOG.error("Migration Import: Error: Current position: {}", index, ex);
} finally {
shutdownEntityCreationManager(creationManager);
}
LOG.info("Migration Import: Size: {}: Done!", streamSize);
return ret;
}
private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
dataMigrationStatusService.init(AtlasStringUtil.getOption(importResult.getRequest().getOptions(), AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
return dataMigrationStatusService;
}
private EntityCreationManager createEntityCreationManager(AtlasImportResult importResult,
DataMigrationStatusService dataMigrationStatusService) {
AtlasGraph graphBulk = graphProvider.getBulkLoading();
EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.graph, typeRegistry);
EntityGraphRetriever entityGraphRetrieverBulk = new EntityGraphRetriever(graphBulk, typeRegistry);
AtlasEntityStoreV2 entityStore = createEntityStore(this.graph, typeRegistry);
AtlasEntityStoreV2 entityStoreBulk = createEntityStore(graphBulk, typeRegistry);
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(typeRegistry, this.graph, entityStore, entityGraphRetriever, graphBulk,
entityStoreBulk, entityGraphRetrieverBulk, batchSize);
LOG.info("MigrationImport: EntityCreationManager: Created!");
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);
}
private static int getNumWorkers(int numWorkersFromOptions) {
int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
LOG.info("Migration Import: Setting numWorkers: {}", ret);
return ret;
}
private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
FullTextMapperV2Nop fullTextMapperV2 = new FullTextMapperV2Nop();
IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry, null);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(graph, typeRegistry, formatConverters);
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier);
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2, null);
return new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
}
private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
try {
creationManager.shutdown();
} catch (InterruptedException e) {
LOG.error("Migration Import: Shutdown: Interrupted!", e);
}
}
}