blob: 2b3f1792c3f2092015dac03004d22af886e26268 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipExportFileNames;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileFilter;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
private static final String FILE_EXTENSION_ZIP = ".zip";
private final static String ENV_USER_NAME = "user.name";
private final static String ARCHIVE_DIR = "archive";
private final ImportService importService;
private List<String> filesToImport;
private DataMigrationStatusService dataMigrationStatusService;
private MigrationImportStatus migrationImportStatus;
private File archiveDir;
/**
* Input:
* fileName : can support wildcards. If it contains wildcards then all matching files will be imported
*/
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
initialize(fileName);
}
private void initialize(String fileName) {
this.filesToImport = getAllFilesToImport(fileName);
if (CollectionUtils.isNotEmpty(this.filesToImport)) {
createArchiveDirectory(fileName);
}
}
@Override
public void run() {
for (String fileToImport : filesToImport) {
try {
detectFileToImport(fileToImport);
int streamSize = getStreamSizeFromComment(fileToImport);
migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
dataMigrationStatusService.setStatus("DONE");
moveZipFileToArchiveDir(fileToImport);
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
dataMigrationStatusService.setStatus("FAIL");
} catch (AtlasBaseException e) {
LOG.error("Migration Import: Error!", e);
dataMigrationStatusService.setStatus("FAIL");
}
}
}
/**
* Input:
* fileName : If it contains wildcards then all matching files will be discovered
*/
private List<String> getAllFilesToImport(String fileName) {
List<String> ret = new ArrayList<>();
File fileToImport = new File(fileName);
if (fileToImport.exists() && fileToImport.isFile()) {
//Input file present so no need to expand
LOG.info("Migration Import: zip file for import: " + fileToImport);
ret.add(fileToImport.getAbsolutePath());
} else {
//The fileName might have wildcard
String dirPath = new File(fileToImport.getParent()).getAbsolutePath();
File importDataDir = new File(dirPath);
if (importDataDir.exists() && importDataDir.isDirectory()) {
String fileNameWithWildcard = fileToImport.getName();
FileFilter fileFilter = new WildcardFileFilter(fileNameWithWildcard);
File[] importFiles = importDataDir.listFiles(fileFilter);
if (ArrayUtils.isNotEmpty(importFiles)) {
Arrays.sort(importFiles);
LOG.info("Migration Import: zip files for import: ");
for (File importFile : importFiles) {
if (isValidImportFile(importFile)) {
LOG.info(importFile.getName() + " with absolute path - " + importFile.getAbsolutePath());
ret.add(importFile.getAbsolutePath());
} else {
LOG.warn("Ignoring " + importFile.getAbsolutePath() + " as it is not a file or does not end with extension " + FILE_EXTENSION_ZIP);
}
}
} else {
LOG.warn("Migration Import: No files to import");
}
}
}
return ret;
}
private boolean isValidImportFile(File importFile) {
return importFile.isFile() && StringUtils.endsWithIgnoreCase(importFile.getName(), FILE_EXTENSION_ZIP);
}
private void createArchiveDirectory(String fileName) {
File fileToImport = new File(fileName);
String parentPath = new File(fileToImport.getParent()).getAbsolutePath();
this.archiveDir = new File(parentPath + File.separator + ARCHIVE_DIR);
if (this.archiveDir.exists() && !this.archiveDir.canWrite()) {
LOG.warn("Migration Import: No write permission to archive directory {}", this.archiveDir.getAbsolutePath());
this.archiveDir = null;
} else if (!this.archiveDir.exists() && !this.archiveDir.getParentFile().canWrite()) {
LOG.warn("Migration Import: No permission to create archive directory {}", this.archiveDir.getAbsolutePath());
this.archiveDir = null;
} else {
this.archiveDir.mkdirs();
LOG.info("Migration Import: archive directory for zip files: {}", this.archiveDir.getAbsolutePath());
}
}
private void moveZipFileToArchiveDir(String srcFilePath) {
if (this.archiveDir == null) {
return;
}
File sourceFile = new File(srcFilePath);
String newFile = archiveDir.getAbsolutePath() + File.separator + sourceFile.getName();
if (!sourceFile.canWrite()) {
LOG.warn("Migration Import: No permission to archive the zip file {}", sourceFile.getAbsolutePath());
this.archiveDir = null;
} else {
if (sourceFile.renameTo(new File(newFile))) {
sourceFile.delete();
LOG.info("Migration Import: Successfully archived the zip file: " + srcFilePath + " to " + this.archiveDir.getAbsolutePath());
} else {
LOG.warn("Migration Import: Failed to archive the zip file: " + srcFilePath);
}
}
}
private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
MigrationImportStatus status = null;
try {
status = new MigrationImportStatus(fileName, DigestUtils.md5Hex(new FileInputStream(fileName)));
} catch (IOException e) {
LOG.error("Exception occurred while creating migration import", e);
}
status.setTotalCount(streamSize);
MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex());
dataMigrationStatusService.setStatus("STARTED");
return statusRetrieved;
}
private void detectFileToImport(String fileToImport) throws IOException {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
}
private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
ZipFile zipFile = new ZipFile(fileToImport);
String comment = zipFile.getComment();
ret = processZipFileStreamSizeComment(comment);
zipFile.close();
} catch (IOException e) {
LOG.error("Error opening ZIP file: {}", fileToImport, e);
}
return ret;
}
private int processZipFileStreamSizeComment(String comment) {
if (StringUtils.isEmpty(comment)) {
return 1;
}
Map map = AtlasType.fromJson(comment, Map.class);
int entitiesCount = (int) map.get(ZIP_FILE_COMMENT_ENTITIES_COUNT);
int totalCount = (int) map.get(ZIP_FILE_COMMENT_TOTAL_COUNT);
LOG.info("ZipFileMigrationImporter: Zip file: Comment: streamSize: {}: total: {}", entitiesCount, totalCount);
return entitiesCount;
}
private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
InputStream fs = new FileInputStream(fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) {
LOG.error("Migration Import: Error loading zip for migration!", ex);
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
}
}
private String getUserNameFromEnvironment() {
return System.getProperty(ENV_USER_NAME);
}
private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport);
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position));
return request;
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
}