blob: d56261f786d32f67b17fe950c08afa808e48f6b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Map;
import java.util.zip.ZipFile;
public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
private final static String ENV_USER_NAME = "user.name";
private final ImportService importService;
private final String fileToImport;
private DataMigrationStatusService dataMigrationStatusService;
private MigrationImportStatus migrationImportStatus;
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.fileToImport = fileName;
this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
}
@Override
public void run() {
try {
detectFileToImport();
int streamSize = getStreamSizeFromComment(fileToImport);
migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
dataMigrationStatusService.setStatus("DONE");
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
dataMigrationStatusService.setStatus("FAIL");
} catch (AtlasBaseException e) {
LOG.error("Migration Import: Error!", e);
dataMigrationStatusService.setStatus("FAIL");
}
}
private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
MigrationImportStatus status = new MigrationImportStatus(fileName);
status.setTotalCount(streamSize);
MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex());
dataMigrationStatusService.setStatus("STARTED");
return statusRetrieved;
}
private void detectFileToImport() throws IOException {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
}
private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
ZipFile zipFile = new ZipFile(fileToImport);
String comment = zipFile.getComment();
ret = processZipFileStreamSizeComment(comment);
zipFile.close();
} catch (IOException e) {
LOG.error("Error opening ZIP file: {}", fileToImport, e);
}
return ret;
}
private int processZipFileStreamSizeComment(String comment) {
if (StringUtils.isEmpty(comment)) {
return 1;
}
Map map = AtlasType.fromJson(comment, Map.class);
int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT);
int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT);
LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount);
return entitiesCount;
}
private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
InputStream fs = new FileInputStream(new File(fileToImport));
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) {
LOG.error("Migration Import: Error loading zip for migration!", ex);
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
}
}
private String getUserNameFromEnvironment() {
return System.getProperty(ENV_USER_NAME);
}
private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport);
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position));
return request;
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
}