blob: a69f7faa0afc2cb2b76340aaa0fa6f2d411e532c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.resources;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.codehaus.jackson.type.TypeReference;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED;
public class ZipSource implements EntityImportStream {
private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
private final ByteArrayInputStream inputStream;
private List<String> creationOrder;
private Iterator<String> iterator;
private Map<String, String> guidEntityJsonMap;
public ZipSource(ByteArrayInputStream inputStream) throws IOException {
this.inputStream = inputStream;
guidEntityJsonMap = new HashMap<>();
updateGuidZipEntryMap();
this.setCreationOrder();
}
public AtlasTypesDef getTypesDef() throws AtlasBaseException {
final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
String s = getFromCache(fileName);
return convertFromJson(AtlasTypesDef.class, s);
}
private void setCreationOrder() {
String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString();
try {
String s = getFromCache(fileName);
this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s);
this.iterator = this.creationOrder.iterator();
} catch (AtlasBaseException e) {
LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e);
}
}
private void updateGuidZipEntryMap() throws IOException {
inputStream.reset();
ZipInputStream zipInputStream = new ZipInputStream(inputStream);
ZipEntry zipEntry = zipInputStream.getNextEntry();
while (zipEntry != null) {
String entryName = zipEntry.getName().replace(".json", "");
if (guidEntityJsonMap.containsKey(entryName)) continue;
byte[] buf = new byte[1024];
int n = 0;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((n = zipInputStream.read(buf, 0, 1024)) > -1) {
bos.write(buf, 0, n);
}
guidEntityJsonMap.put(entryName, bos.toString());
zipEntry = zipInputStream.getNextEntry();
}
zipInputStream.close();
}
public List<String> getCreationOrder() throws AtlasBaseException {
return this.creationOrder;
}
public AtlasEntity getEntity(String guid) throws AtlasBaseException {
String s = getFromCache(guid);
return convertFromJson(AtlasEntity.class, s);
}
private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException {
try {
ObjectMapper mapper = new ObjectMapper();
T ret = mapper.readValue(jsonData, clazz);
if(ret == null) {
throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString());
}
return ret;
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonData, clazz);
} catch (Exception e) {
throw new AtlasBaseException("Error converting file to JSON.", e);
}
}
private String getFromCache(String entryName) {
if(!guidEntityJsonMap.containsKey(entryName)) return "";
return guidEntityJsonMap.get(entryName).toString();
}
public void close() {
try {
inputStream.close();
guidEntityJsonMap.clear();
}
catch(IOException ex) {
LOG.warn("{}: Error closing streams.");
}
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public AtlasEntity next() {
try {
return getEntity(this.iterator.next());
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
@Override
public void reset() {
try {
getCreationOrder();
this.iterator = this.creationOrder.iterator();
} catch (AtlasBaseException e) {
LOG.error("reset", e);
}
}
@Override
public AtlasEntity getByGuid(String guid) {
try {
return getEntity(guid);
} catch (AtlasBaseException e) {
e.printStackTrace();
return null;
}
}
@Override
public void onImportComplete(String guid) {
if(guid != null) {
guidEntityJsonMap.remove(guid);
}
}
}