blob: edb816fc822ef44890027dc21d7c411c95354373 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
private final InputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
private ImportTransforms importTransform;
private int currentPosition;
public ZipSource(InputStream inputStream) throws IOException {
this(inputStream, null);
}
public ZipSource(InputStream inputStream, ImportTransforms importTransform) throws IOException {
this.inputStream = inputStream;
this.guidEntityJsonMap = new HashMap<>();
this.importTransform = importTransform;
updateGuidZipEntryMap();
setCreationOrder();
}
public ImportTransforms getImportTransform() { return this.importTransform; }
public void setImportTransform(ImportTransforms importTransform) {
this.importTransform = importTransform;
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
String s = (String) getFromCache(fileName);
return convertFromJson(AtlasTypesDef.class, s);
}
public AtlasExportResult getExportResult() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString();
String s = getFromCache(fileName);
return convertFromJson(AtlasExportResult.class, s);
}
private void setCreationOrder() {
String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString();
try {
String s = getFromCache(fileName);
this.creationOrder = convertFromJson(List.class, s);
this.iterator = this.creationOrder.iterator();
} catch (AtlasBaseException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
}
}
private void updateGuidZipEntryMap() throws IOException {
ZipInputStream zipInputStream = new ZipInputStream(inputStream);
ZipEntry zipEntry = zipInputStream.getNextEntry();
while (zipEntry != null) {
String entryName = zipEntry.getName().replace(".json", "");
if (guidEntityJsonMap.containsKey(entryName)) continue;
byte[] buf = new byte[1024];
int n = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
bos.write(buf, 0, n);
}
guidEntityJsonMap.put(entryName, bos.toString());
zipEntry = zipInputStream.getNextEntry();
}
zipInputStream.close();
}
public List<String> getCreationOrder() throws AtlasBaseException {
return this.creationOrder;
}
public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException {
String s = getFromCache(guid);
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
if (importTransform != null) {
entityWithExtInfo = importTransform.apply(entityWithExtInfo);
}
return entityWithExtInfo;
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
try {
return AtlasType.fromJson(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private String getFromCache(String entryName) {
return guidEntityJsonMap.get(entryName);
}
public void close() {
try {
inputStream.close();
guidEntityJsonMap.clear();
}
catch(IOException ex) {
LOG.warn("{}: Error closing streams.");
}
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public AtlasEntity next() {
AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
}
@Override
public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
try {
currentPosition++;
return getEntityWithExtInfo(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
@Override
public void reset() {
try {
getCreationOrder();
this.iterator = this.creationOrder.iterator();
} catch (AtlasBaseException e) {
LOG.error("reset", e);
}
}
@Override
public AtlasEntity getByGuid(String guid) {
try {
AtlasEntity entity = getEntity(guid);
return entity;
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
private AtlasEntity getEntity(String guid) throws AtlasBaseException {
if(guidEntityJsonMap.containsKey(guid)) {
AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
return (extInfo != null) ? extInfo.getEntity() : null;
}
return null;
}
public int size() {
return this.creationOrder.size();
}
@Override
public void onImportComplete(String guid) {
guidEntityJsonMap.remove(guid);
}
@Override
public void setPosition(int index) {
currentPosition = index;
reset();
for (int i = 0; i < creationOrder.size() && i <= index; i++) {
iterator.next();
}
}
@Override
public void setPositionUsingEntityGuid(String guid) {
if(StringUtils.isBlank(guid)) {
return;
}
int index = creationOrder.indexOf(guid);
if (index == -1) {
return;
}
setPosition(index);
}
@Override
public int getPosition() {
return currentPosition;
}
}