| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.usergrid.tools; |
| |
| |
| import static org.apache.usergrid.management.AccountCreationProps.PROPERTIES_USERGRID_BINARY_UPLOADER; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.UnsupportedEncodingException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.MissingOptionException; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.OptionBuilder; |
| import org.apache.commons.cli.Options; |
| import org.apache.usergrid.corepersistence.ManagerCache; |
| import org.apache.usergrid.corepersistence.export.ExportRequestBuilder; |
| import org.apache.usergrid.corepersistence.export.ExportRequestBuilderImpl; |
| import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; |
| import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; |
| import org.apache.usergrid.corepersistence.util.CpNamingUtils; |
| import org.apache.usergrid.management.OrganizationInfo; |
| import org.apache.usergrid.management.UserInfo; |
| import org.apache.usergrid.persistence.Entity; |
| import org.apache.usergrid.persistence.EntityManager; |
| import org.apache.usergrid.persistence.Results; |
| import org.apache.usergrid.persistence.SimpleEntityRef; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManager; |
| import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.graph.GraphManager; |
| import org.apache.usergrid.persistence.graph.SearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.impl.SimpleEdge; |
| import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.model.entity.SimpleId; |
| import org.apache.usergrid.services.assets.BinaryStoreFactory; |
| import org.apache.usergrid.services.assets.data.BinaryStore; |
| import org.apache.usergrid.tools.bean.ExportOrg; |
| import org.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| |
| import com.fasterxml.jackson.core.JsonFactory; |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.google.common.base.Optional; |
| import com.google.common.collect.BiMap; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| |
| import rx.Observable; |
| import rx.observables.ConnectableObservable; |
| import rx.schedulers.Schedulers; |
| |
| |
| public class Export extends ExportingToolBase { |
| |
| static final Logger logger = LoggerFactory.getLogger( Export.class ); |
| private static final String ENTITY_FETCHER_THREADS = "entityFetchThreads"; |
| private static final String ENTITY_MEMBER_FETCHER_MULT = "entityThreadMult"; |
| |
| |
| @Autowired |
| private BinaryStoreFactory binaryStoreFactory; |
| |
| JsonFactory jsonFactory = new JsonFactory(); |
| |
| private AllEntityIdsObservable allEntityIdsObs; |
| private SimpleEdge lastEdge = null; |
| |
| //number of threads for fetching entity contents. Each thread will handle a batch of 1000 entity ids |
| private int entityFetcherThreads = 50; |
| //after an individual entity is fetched, the entity members like assets, connections etc need to be fetched |
| //depending on how heavy the assets/connections might be, we might need to multiply the factor so that more threads are allocated |
| //for pulling the members quickly without the queue backing up. |
| private int entityMemberFetcherMultiplier = 1; |
| |
| |
| //TODO : Add blocking queues for these executors where appropriate |
| private ExecutorService orgAppCollParallelizer; |
| |
| //fetches the entity content |
| private ExecutorService entityFetcher; |
| //fetches the entity members like connections etc for a given entity |
| private ExecutorService entityMemberFetcher; |
| //fetches the assets for a given entity |
| private ExecutorService assetsFetcher; |
| |
| |
| @Override |
| @SuppressWarnings("static-access") |
| public Options createOptions() { |
| |
| Options options = super.createOptions(); |
| |
| |
| Option entityFetcherThreads = OptionBuilder.withArgName( ENTITY_FETCHER_THREADS ).hasArg() |
| .withDescription( "Number of threads to fetch entities in parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS ); |
| options.addOption( entityFetcherThreads); |
| |
| Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( ENTITY_MEMBER_FETCHER_MULT ).hasArg() |
| .withDescription( "This defines the number of threads for fetching entity members like assets/collections by multiplying the number of entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT ); |
| options.addOption( entityMemberFetcherMultiplier); |
| |
| return options; |
| } |
| |
| @Override |
| protected void validateOptions(CommandLine line) throws MissingOptionException { |
| super.validateOptions(line); |
| |
| if (line.hasOption(ENTITY_FETCHER_THREADS)) { |
| try { |
| Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS)); |
| } catch (NumberFormatException e) { |
| throw new MissingOptionException("Entity fetcher threads need to be a positive integer"); |
| } |
| } |
| |
| if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) { |
| try { |
| Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT)); |
| } catch (NumberFormatException e) { |
| throw new MissingOptionException("Entity member thread multiplier needs to be a positive integer"); |
| } |
| } |
| |
| } |
| |
| @Override |
| protected void applyExportParams(CommandLine line) { |
| |
| super.applyExportParams(line); |
| |
| if (line.hasOption(ENTITY_FETCHER_THREADS)) { |
| entityFetcherThreads = Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS)); |
| |
| if (entityFetcherThreads < 1) { |
| entityFetcherThreads = 50; |
| } |
| } |
| |
| if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) { |
| entityMemberFetcherMultiplier = Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT)); |
| |
| if (entityMemberFetcherMultiplier < 1) { |
| entityMemberFetcherMultiplier = 1; |
| } |
| if (entityMemberFetcherMultiplier > 5) { |
| entityMemberFetcherMultiplier = 5; |
| } |
| } |
| |
| orgAppCollParallelizer = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build()); |
| entityFetcher = Executors.newFixedThreadPool(entityFetcherThreads, new ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build()); |
| entityMemberFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build()); |
| assetsFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build()); |
| |
| } |
| |
| @Override |
| public void runTool( CommandLine line ) throws Exception { |
| |
| startSpring(); |
| setVerbose( line ); |
| |
| Gson gson = new GsonBuilder().create(); |
| |
| this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class); |
| applyExportParams(line); |
| prepareBaseOutputFileName( line ); |
| |
| |
| if(lastEdgeJson != null) { |
| JSONObject lastEdgeJsonObj = new JSONObject(lastEdgeJson); |
| UUID uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("sourceNode").getString("uuid")); |
| Id sourceId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("sourceNode").getString("type")); |
| uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("targetNode").getString("uuid")); |
| Id targetId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("targetNode").getString("type")); |
| lastEdge = new SimpleEdge(sourceId, lastEdgeJsonObj.getString("type"), targetId, lastEdgeJsonObj.getLong("timestamp")); |
| } |
| |
| outputDir = createOutputParentDir(); |
| logger.info( "Export directory: " + outputDir.getAbsolutePath() ); |
| |
| |
| // Export organizations separately. |
| exportOrganizations(); |
| |
| logger.info("Finished export waiting for threads to end."); |
| |
| while(true) { |
| try { |
| //Spinning to prevent program execution from ending. |
| //Need to replace with some kind of countdown latch or task tracker |
| Thread.sleep(10000); |
| } catch (InterruptedException e) { |
| logger.error("Exception while waiting for export to complete.",e); |
| } |
| } |
| |
| } |
| |
| |
| |
| private void exportOrganizations() throws Exception, UnsupportedEncodingException { |
| |
| for (Entry<UUID, String> organizationName : getOrgs().entrySet()) { |
| |
| // Let's skip the test entities. |
| if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) { |
| continue; |
| } |
| |
| OrganizationInfo orgInfo = managementService.getOrganizationByUuid(organizationName.getKey()); |
| logger.info("Exporting Organization: " + orgInfo.getName()); |
| |
| ExportOrg exportOrg = new ExportOrg(orgInfo); |
| |
| List<UserInfo> users = managementService.getAdminUsersForOrganization(organizationName.getKey()); |
| |
| for (UserInfo user : users) { |
| exportOrg.addAdmin(user.getUsername()); |
| } |
| |
| File orgDir = createOrgDir(orgInfo.getName()); |
| |
| // One file per Organization. |
| saveOrganizationMetadata(orgDir, exportOrg); |
| |
| exportApplicationsForOrg(orgDir, organizationName.getKey(), organizationName.getValue()); |
| } |
| } |
| |
| /** |
| * Serialize an Organization into a json file. |
| * @param orgDir |
| * |
| * @param acc OrganizationInfo |
| */ |
| private void saveOrganizationMetadata( File orgDir, ExportOrg acc ) { |
| |
| try { |
| |
| File outFile = createOutputFile( orgDir, "organization", acc.getName() ); |
| com.fasterxml.jackson.core.JsonGenerator jg = getJsonGenerator( outFile ); |
| jg.writeObject( acc ); |
| jg.close(); |
| } |
| catch ( Exception e ) { |
| throw new RuntimeException( e ); |
| } |
| } |
| |
| |
| private Map<UUID, String> getOrgs() throws Exception { |
| // Loop through the organizations |
| Map<UUID, String> organizationNames = null; |
| |
| if ( orgId == null && (orgName == null || orgName.trim().equals(""))) { |
| organizationNames = managementService.getOrganizations(); |
| } |
| else { |
| OrganizationInfo info = null; |
| |
| if( orgId != null ) { |
| info = managementService.getOrganizationByUuid( orgId ); |
| } |
| else { |
| info = managementService.getOrganizationByName( orgName ); |
| } |
| |
| if ( info == null ) { |
| logger.error( "Organization info is null!" ); |
| System.exit( 1 ); |
| } |
| |
| organizationNames = new HashMap<UUID, String>(); |
| organizationNames.put( info.getUuid(), info.getName() ); |
| } |
| |
| return organizationNames; |
| } |
| |
| private void exportApplicationsForOrg(File orgDir, UUID orgId, String orgName ) throws Exception { |
| |
| logger.info("Exporting applications for {} : {} ",orgId, orgName); |
| |
| // Loop through the applications per organization |
| BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( orgId ); |
| |
| if ( applicationId == null && (applicationName == null || applicationName.trim().equals(""))) { |
| //export all apps as appId or name is not provided |
| |
| Observable.from(applications.entrySet()) |
| .subscribeOn(Schedulers.from(orgAppCollParallelizer)) |
| .subscribe(appEntry -> { |
| UUID appId = appEntry.getKey(); |
| String appName = appEntry.getValue().split("/")[1]; |
| try { |
| exportApplication(orgDir, appId, appName); |
| } catch (Exception e) { |
| logger.error("There was an exception exporting application {} : {}",appName, appId, e); |
| } |
| }); |
| |
| } |
| else { |
| |
| UUID appId = applicationId; |
| String appName = applicationName; |
| |
| if( applicationId != null ) { |
| appName = applications.get(appId); |
| } |
| else { |
| appId = applications.inverse().get(orgName+'/'+appName); |
| } |
| |
| try { |
| exportApplication(orgDir, appId, appName); |
| } catch (Exception e) { |
| logger.error("There was an exception exporting application {} : {}",appName, appId, e); |
| } |
| |
| } |
| } |
| |
| private void exportApplication(File orgDir, UUID appId, String appName) throws Exception { |
| |
| |
| logger.info( "Starting application export for {} : {} ",appName, appId ); |
| File appDir = createApplicationDir(orgDir, appName); |
| |
| JsonGenerator jg = |
| getJsonGenerator( createOutputFile( appDir, "application", appName) ); |
| |
| // load the dictionary |
| EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() ); |
| |
| Entity appEntity = rootEm.get( new SimpleEntityRef( "org_application", appId)); |
| |
| Map<String, Object> dictionaries = new HashMap<String, Object>(); |
| |
| for ( String dictionary : rootEm.getDictionaries( appEntity ) ) { |
| Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary ); |
| |
| // nothing to do |
| if ( dict.isEmpty() ) { |
| continue; |
| } |
| |
| dictionaries.put( dictionary, dict ); |
| } |
| |
| EntityManager em = emf.getEntityManager( appId); |
| |
| // Get application |
| Entity nsEntity = em.get( new SimpleEntityRef( "application", appId)); |
| |
| Set<String> collections = em.getApplicationCollections(); |
| |
| // load app counters |
| |
| Map<String, Long> entityCounters = em.getApplicationCounters(); |
| |
| //nsEntity.setMetadata( "organization", orgName ); |
| nsEntity.setMetadata( "dictionaries", dictionaries ); |
| // counters for collections |
| nsEntity.setMetadata( "counters", entityCounters ); |
| nsEntity.setMetadata( "collections", collections ); |
| |
| jg.writeStartArray(); |
| jg.writeObject( nsEntity ); |
| jg.close(); |
| |
| if ( collNames == null || collNames.length <= 0) { |
| //export all collections as collection names are not provided |
| |
| Observable.from(collections) |
| .subscribeOn(Schedulers.from(orgAppCollParallelizer)) |
| .subscribe(collectionName -> { |
| exportCollection(appDir, appId, collectionName, em); |
| }); |
| |
| } |
| else { |
| Observable.from(collNames) |
| .subscribeOn(Schedulers.from(orgAppCollParallelizer)) |
| .subscribe(collectionName -> { |
| if(collections.contains(collectionName)) { |
| exportCollection(appDir, appId, collectionName, em); |
| } |
| }); |
| } |
| |
| } |
| |
| private void exportCollection(File appDir, UUID appId, String collectionName, EntityManager em) { |
| File collectionDir = createCollectionDir(appDir, collectionName); |
| extractEntityIdsForCollection(collectionDir, appId, collectionName); |
| } |
| |
| private void extractEntityIdsForCollection(File collectionDir, UUID applicationId, String collectionName) { |
| |
| AtomicInteger batch = new AtomicInteger(1); |
| |
| final EntityManager rootEm = emf.getEntityManager(applicationId); |
| final Gson gson = new GsonBuilder().create(); |
| ManagerCache managerCache = injector.getInstance(ManagerCache.class); |
| ExportRequestBuilder builder = new ExportRequestBuilderImpl().withApplicationId(applicationId); |
| final ApplicationScope appScope = builder.getApplicationScope().get(); |
| GraphManager gm = managerCache.getGraphManager(appScope); |
| EntityCollectionManagerFactory entityCollectionManagerFactory = injector |
| .getInstance(EntityCollectionManagerFactory.class); |
| final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope); |
| |
| ExecutorService entityIdWriter = Executors.newFixedThreadPool(1); |
| allEntityIdsObs |
| .getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),(this.lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge))) |
| .buffer(1000) |
| .finallyDo(()-> { |
| entityIdWriter.shutdown(); |
| logger.info("Finished fetching entity ids for {}. Shutting down entity id writer executor ", collectionName); |
| while(!entityIdWriter.isTerminated()) { |
| try { |
| entityIdWriter.awaitTermination(10, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| } |
| } |
| logger.info("Entity id writer executor terminated after shutdown for {}", collectionName); |
| }) |
| .subscribe(edges -> { |
| |
| logger.info("For collection {}" , collectionName); |
| Integer batchId = batch.getAndIncrement(); |
| logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId); |
| Observable.just(edges) |
| .subscribeOn(Schedulers.from(entityIdWriter)) |
| .subscribe(edgeScopes -> { |
| |
| List<UUID> entityIds = new ArrayList<UUID>(1000); |
| |
| for (EdgeScope edgeScope : edgeScopes) { |
| // write to file |
| Id entityId = edgeScope.getEdge().getTargetNode(); |
| if (entityId != null) { |
| entityIds.add(entityId.getUuid()); |
| } else { |
| edgeScopes.remove(edgeScope); |
| } |
| } |
| // extract name for this batch |
| try { |
| writeEntityIdsBatch(collectionDir, edgeScopes, batchId, collectionName); |
| String type = edgeScopes.get(0).getEdge().getTargetNode().getType(); |
| |
| Observable.just(entityIds) |
| .subscribeOn(Schedulers.from(entityFetcher)) // change to |
| .subscribe(entIds -> { |
| |
| // get entities here |
| logger.info("entIds count {} for type {}", entIds.size(), type); |
| Results entities = rootEm.getEntities(entIds, type); |
| int size = entities.getEntities().size(); |
| logger.info("Got {} entities.", size); |
| |
| if(!skipConnections || !skipDictionaries || !skipAssets) { |
| |
| ConnectableObservable<Results> entityObs = Observable.just(entities) |
| .publish(); |
| entityObs.subscribeOn(Schedulers.from(entityMemberFetcher)); |
| |
| |
| // fetch and write connections |
| if(!skipConnections) { |
| entityObs.subscribe(entity -> { |
| fetchConnections(gm, ecm, entity, collectionDir, collectionName, batchId, gson); |
| |
| }); |
| } |
| // fetch and write dictionaries |
| if(!skipDictionaries) { |
| entityObs.subscribe(entity -> { |
| fetchDictionaries(collectionDir, collectionName, rootEm, |
| entity, gson, batchId); |
| }); |
| } |
| |
| if(!skipAssets) { |
| File assetsDir = createDir(collectionDir.getAbsolutePath(), "files"); |
| entityObs.subscribe(entity -> { |
| try { |
| fetchAssets(assetsDir, applicationId, collectionName, batchId, entities); |
| } catch (Exception e) { |
| logger.error("Exception while trying to fetch assets for app {}, collection {}, batch {} ", |
| applicationId, collectionName, batchId, e); |
| } |
| }); |
| } |
| entityObs.connect(); |
| } |
| writeEntities(collectionDir, entities, batchId, collectionName, gson); |
| }); |
| |
| } catch (Exception e) { |
| logger.error("There was an error writing entity ids to file for " |
| + edgeScopes.get(0).getEdge(), e); |
| // since entity id writing has failed, we need to see how we can not exit the |
| // whole program |
| System.exit(0); |
| } |
| }); |
| |
| logger.info("Finished fetching details for collection {} for batch {}", collectionName, batchId); |
| }); |
| logger.info("Exiting extractEntityIdsForCollection() method."); |
| } |
| |
| private void fetchAssets(File assetsDir, UUID applicationId, String collectionName, Integer batchId, |
| Results entities) throws Exception { |
| |
| List<Entity> entitiesWithAssets = new ArrayList<>(); |
| |
| for (Entity e : entities.getEntities()) { |
| if (e.getProperty("file-metadata") != null) { |
| entitiesWithAssets.add(e); |
| } |
| } |
| |
| if (!entitiesWithAssets.isEmpty()) { |
| |
| writeAssets(assetsDir, collectionName, batchId, entitiesWithAssets); |
| |
| ConnectableObservable<Entity> entityAssets = Observable.from(entitiesWithAssets).publish(); |
| entityAssets.subscribeOn(Schedulers.from(assetsFetcher)); |
| entityAssets.subscribe(e -> { |
| // Write code to fetch these assets from entity store. |
| BinaryStore binaryStore = null; |
| try { |
| binaryStore = binaryStoreFactory |
| .getBinaryStore(properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER)); |
| } catch (Exception e2) { |
| logger.error("Except on while trying to get binary store for property {}, ", properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER), e2 ); |
| } |
| |
| File file = new File(assetsDir + "/" + collectionName + "_assets_" + e.getUuid()); |
| try (InputStream in = binaryStore.read(applicationId, e); |
| OutputStream out = new BufferedOutputStream(new FileOutputStream(file));) { |
| |
| int read = -1; |
| |
| while ((read = in.read()) != -1) { |
| out.write(read); |
| } |
| |
| } catch (Exception e1) { |
| logger.error("Exception while to write assets file for entity {}", e.getUuid(), e1); |
| } |
| |
| }); |
| entityAssets.connect(); |
| } |
| } |
| |
| private void writeAssets(final File collectionDir, final String collectionName, final Integer batchId, |
| List<Entity> entitiesWithAssets2) { |
| |
| try (BufferedWriter assetsWriter = new BufferedWriter( |
| new FileWriter(new File(collectionDir + "/" + collectionName + "_assets_" + batchId + ".json")));) { |
| for (Entity e : entitiesWithAssets2) { |
| JSONObject object = new JSONObject(); |
| object.put("uuid", e.getUuid()); |
| object.put("type", e.getType()); |
| object.put("file-metadata", e.getProperty("file-metadata")); |
| object.put("file", (e.getProperty("file") != null) ? e.getProperty("file") : null); |
| assetsWriter.write(object.toString()); |
| assetsWriter.newLine(); |
| } |
| } catch (Exception ex) { |
| logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId, |
| ex); |
| } |
| } |
| |
| private void fetchDictionaries(File collectionDir, String collectionName, final EntityManager rootEm, |
| Results entity, Gson gson, Integer batchId) { |
| |
| //TODO : still using JsonGenerator |
| JsonGenerator jgDictionaries = null; |
| try { |
| jgDictionaries = getJsonGenerator(new File(collectionDir + "/" + collectionName + "_" + "dictionaries_" + batchId)); |
| |
| for (Entity et : entity.getEntities()) { |
| Set<String> dictionaries; |
| try { |
| dictionaries = rootEm.getDictionaries(et); |
| |
| jgDictionaries.writeStartArray(); |
| if (dictionaries != null && !dictionaries.isEmpty()) { |
| for (String dictionary : dictionaries) { |
| Map<Object, Object> dict = rootEm.getDictionaryAsMap(et, dictionary); |
| if (dict != null && dict.isEmpty()) { |
| continue; |
| } |
| |
| jgDictionaries.writeStartObject(); |
| jgDictionaries.writeObjectField("Entity", et.getUuid()); |
| jgDictionaries.writeObjectField("EntityType", et.getType()); |
| jgDictionaries.writeObjectField(dictionary, dict); |
| jgDictionaries.writeEndObject(); |
| } |
| jgDictionaries.writeEndArray(); |
| } |
| } catch (Exception e) { |
| logger.error("Exception while trying to fetch dictionaries.", e); |
| } |
| } |
| } catch (Exception e) { |
| logger.error("Exception while trying to fetch dictionaries.", e); |
| } finally { |
| if (jgDictionaries != null) { |
| try { |
| jgDictionaries.close(); |
| } catch (IOException e) { |
| logger.error("Exception while trying to close dictionaries writer.", e); |
| } |
| } |
| } |
| } |
| |
| private void fetchConnections(GraphManager gm, final EntityCollectionManager ecm, Results entity, |
| File collectionDir, String collectionName, Integer batchId, Gson gson) { |
| |
| try(BufferedWriter bufferedWriter = new BufferedWriter( |
| new FileWriter(new File(collectionDir + "/" + collectionName + "_" + "connections_" + batchId)));){ |
| |
| for (Entity et : entity.getEntities()) { |
| |
| List<ConnectionPojo> connections = new ArrayList<>(); |
| |
| SimpleId id = new SimpleId(); |
| id.setType(et.getType()); |
| id.setUuid(et.getUuid()); |
| |
| gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(id)) |
| .flatMap(emittedEdgeType -> { |
| logger.debug("loading edges of type {} from node {}", emittedEdgeType, id); |
| return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(id, emittedEdgeType, |
| Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent())); |
| }).map(markedEdge -> { |
| |
| if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted() |
| && markedEdge.getTargetNode() != null) { |
| |
| // doing the load to just again make sure bad |
| // connections are not exported |
| org.apache.usergrid.persistence.model.entity.Entity en = ecm |
| .load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null); |
| |
| if (en != null) { |
| |
| try { |
| |
| ConnectionPojo connectionPojo = new ConnectionPojo(); |
| connectionPojo.setRelationship(CpNamingUtils |
| .getConnectionNameFromEdgeName(markedEdge.getType())); |
| connectionPojo.setSourceNodeUUID(markedEdge.getSourceNode().getUuid().toString()); |
| connectionPojo.setTargetNodeUUID(markedEdge.getTargetNode().getUuid().toString()); |
| |
| connections.add(connectionPojo); |
| |
| } catch (Exception e) { |
| logger.error("Exception while trying process connection entity", e); |
| } |
| } else { |
| logger.warn( |
| "Exported connection has a missing target node, not creating connection in export. Edge: {}", |
| markedEdge); |
| } |
| } |
| return null; |
| |
| }).toBlocking().lastOrDefault(null); |
| |
| for(ConnectionPojo c : connections) { |
| bufferedWriter.write(gson.toJson(c)); |
| bufferedWriter.newLine(); |
| } |
| } |
| }catch (Exception e) { |
| logger.error("Exception while trying to write connection to file.", e); |
| } |
| logger.info("Finished fetching details for collection {} batch {}", collectionName, batchId); |
| } |
| |
| |
| class ConnectionPojo { |
| private String sourceNodeUUID; |
| private String relationship; |
| private String targetNodeUUID; |
| public String getSourceNodeUUID() { |
| return sourceNodeUUID; |
| } |
| public void setSourceNodeUUID(String sourceNodeUUID) { |
| this.sourceNodeUUID = sourceNodeUUID; |
| } |
| public String getRelationship() { |
| return relationship; |
| } |
| public void setRelationship(String relationship) { |
| this.relationship = relationship; |
| } |
| public String getTargetNodeUUID() { |
| return targetNodeUUID; |
| } |
| public void setTargetNodeUUID(String targetNodeUUID) { |
| this.targetNodeUUID = targetNodeUUID; |
| } |
| |
| } |
| |
| private void writeEntities(File collectionDir, Results entities, Integer batchId, String collectionName, Gson gson) { |
| logger.info("Started writing entities for collection {} batch {} ", collectionName, batchId); |
| |
| try(BufferedWriter bufferedWriter = new BufferedWriter( |
| new FileWriter(new File(collectionDir + "/" + collectionName + "_data_" + batchId + ".json")));) { |
| |
| logger.info("Got count {} entities for file writing", entities.getEntities().size()); |
| for(Entity e : entities.getEntities()) { |
| bufferedWriter.write(gson.toJson(e)); |
| bufferedWriter.newLine(); |
| } |
| |
| } catch (Exception e) { |
| logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId, e); |
| } |
| |
| logger.info("Finised writing entities for collection {} batch {} ", collectionName, batchId); |
| } |
| |
| private void writeEntityIdsBatch(File collectionDir, List<EdgeScope> edgeScopes, Integer batchId, |
| String collectionName) throws Exception { |
| logger.info("Started writing ids for collection {} batch {} ", collectionName, batchId); |
| try (BufferedWriter bufferedWriter = new BufferedWriter( |
| new FileWriter(new File(collectionDir + "/" + collectionName + "_" + batchId)));) { |
| for (EdgeScope es : edgeScopes) { |
| bufferedWriter.write(es.getEdge().toString()); |
| bufferedWriter.newLine(); |
| } |
| } catch (Exception e) { |
| logger.error("Exception while tryign to write entity ids for collection {} batch {}", collectionName, batchId, e); |
| } |
| logger.info("Finished writing ids for collection {} batch {} ", collectionName, batchId); |
| } |
| |
| } |