| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.repository.impexp; |
| |
| import org.apache.atlas.exception.AtlasBaseException; |
| import org.apache.atlas.model.impexp.AtlasExportResult; |
| import org.apache.atlas.model.instance.AtlasEntity; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; |
| import org.apache.atlas.model.typedef.AtlasTypesDef; |
| import org.apache.atlas.repository.store.graph.v1.EntityImportStream; |
| import org.apache.atlas.type.AtlasType; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| |
| |
| public class ZipSource implements EntityImportStream { |
| private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); |
| |
| private final InputStream inputStream; |
| private List<String> creationOrder; |
| private Iterator<String> iterator; |
| private Map<String, String> guidEntityJsonMap; |
| private ImportTransforms importTransform; |
| private int currentPosition; |
| |
| public ZipSource(InputStream inputStream) throws IOException { |
| this(inputStream, null); |
| } |
| |
| public ZipSource(InputStream inputStream, ImportTransforms importTransform) throws IOException { |
| this.inputStream = inputStream; |
| this.guidEntityJsonMap = new HashMap<>(); |
| this.importTransform = importTransform; |
| |
| updateGuidZipEntryMap(); |
| setCreationOrder(); |
| } |
| |
| public ImportTransforms getImportTransform() { return this.importTransform; } |
| |
| public void setImportTransform(ImportTransforms importTransform) { |
| this.importTransform = importTransform; |
| } |
| |
| public AtlasTypesDef getTypesDef() throws AtlasBaseException { |
| final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); |
| |
| String s = (String) getFromCache(fileName); |
| return convertFromJson(AtlasTypesDef.class, s); |
| } |
| |
| public AtlasExportResult getExportResult() throws AtlasBaseException { |
| final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); |
| |
| String s = getFromCache(fileName); |
| return convertFromJson(AtlasExportResult.class, s); |
| } |
| |
| private void setCreationOrder() { |
| String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(); |
| |
| try { |
| String s = getFromCache(fileName); |
| this.creationOrder = convertFromJson(List.class, s); |
| this.iterator = this.creationOrder.iterator(); |
| } catch (AtlasBaseException e) { |
| LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); |
| } |
| } |
| |
| private void updateGuidZipEntryMap() throws IOException { |
| |
| ZipInputStream zipInputStream = new ZipInputStream(inputStream); |
| ZipEntry zipEntry = zipInputStream.getNextEntry(); |
| while (zipEntry != null) { |
| String entryName = zipEntry.getName().replace(".json", ""); |
| |
| if (guidEntityJsonMap.containsKey(entryName)) continue; |
| |
| byte[] buf = new byte[1024]; |
| |
| int n = 0; |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { |
| bos.write(buf, 0, n); |
| } |
| |
| guidEntityJsonMap.put(entryName, bos.toString()); |
| zipEntry = zipInputStream.getNextEntry(); |
| |
| } |
| |
| zipInputStream.close(); |
| } |
| |
| public List<String> getCreationOrder() throws AtlasBaseException { |
| return this.creationOrder; |
| } |
| |
| public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { |
| String s = getFromCache(guid); |
| AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); |
| |
| if (importTransform != null) { |
| entityWithExtInfo = importTransform.apply(entityWithExtInfo); |
| } |
| |
| return entityWithExtInfo; |
| } |
| |
| private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { |
| try { |
| return AtlasType.fromJson(jsonData, clazz); |
| |
| } catch (Exception e) { |
| throw new AtlasBaseException("Error converting file to JSON.", e); |
| } |
| } |
| |
| private String getFromCache(String entryName) { |
| return guidEntityJsonMap.get(entryName); |
| } |
| |
| public void close() { |
| try { |
| inputStream.close(); |
| guidEntityJsonMap.clear(); |
| } |
| catch(IOException ex) { |
| LOG.warn("{}: Error closing streams."); |
| } |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return this.iterator.hasNext(); |
| } |
| |
| @Override |
| public AtlasEntity next() { |
| AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); |
| |
| return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; |
| } |
| |
| @Override |
| public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { |
| try { |
| currentPosition++; |
| return getEntityWithExtInfo(this.iterator.next()); |
| } catch (AtlasBaseException e) { |
| e.printStackTrace(); |
| return null; |
| } |
| } |
| |
| @Override |
| public void reset() { |
| try { |
| getCreationOrder(); |
| this.iterator = this.creationOrder.iterator(); |
| } catch (AtlasBaseException e) { |
| LOG.error("reset", e); |
| } |
| } |
| |
| @Override |
| public AtlasEntity getByGuid(String guid) { |
| try { |
| AtlasEntity entity = getEntity(guid); |
| return entity; |
| } catch (AtlasBaseException e) { |
| e.printStackTrace(); |
| return null; |
| } |
| } |
| |
| private AtlasEntity getEntity(String guid) throws AtlasBaseException { |
| if(guidEntityJsonMap.containsKey(guid)) { |
| AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); |
| return (extInfo != null) ? extInfo.getEntity() : null; |
| } |
| |
| return null; |
| } |
| |
| public int size() { |
| return this.creationOrder.size(); |
| } |
| |
| @Override |
| public void onImportComplete(String guid) { |
| guidEntityJsonMap.remove(guid); |
| } |
| |
| |
| @Override |
| public void setPosition(int index) { |
| currentPosition = index; |
| reset(); |
| for (int i = 0; i < creationOrder.size() && i <= index; i++) { |
| iterator.next(); |
| } |
| } |
| |
| @Override |
| public void setPositionUsingEntityGuid(String guid) { |
| if(StringUtils.isBlank(guid)) { |
| return; |
| } |
| |
| int index = creationOrder.indexOf(guid); |
| if (index == -1) { |
| return; |
| } |
| |
| setPosition(index); |
| } |
| |
| @Override |
| public int getPosition() { |
| return currentPosition; |
| } |
| |
| |
| } |