blob: e45820aa9c27639f049c8a24fb3ac6acce2591e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.management.export;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.service.SchedulerService;
import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.ManagementService;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.PagingResultsIterator;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.entities.Export;
import org.apache.usergrid.persistence.entities.JobData;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Query.Level;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BiMap;
/**
* Need to refactor out the mutliple orgs being take , need to factor out the multiple apps it will just be the one app
* and the one org and all of it's collections.
*/
public class ExportServiceImpl implements ExportService {
private static final Logger logger = LoggerFactory.getLogger( ExportServiceImpl.class );
public static final String EXPORT_ID = "exportId";
public static final String EXPORT_JOB_NAME = "exportJob";
//dependency injection
private SchedulerService sch;
//injected the Entity Manager Factory
protected EntityManagerFactory emf;
//inject Management Service to access Organization Data
private ManagementService managementService;
//Maximum amount of entities retrieved in a single go.
public static final int MAX_ENTITY_FETCH = 1000;
//Amount of time that has passed before sending another heart beat in millis
public static final int TIMESTAMP_DELTA = 5000;
private JsonFactory jsonFactory = new JsonFactory();
@Override
public UUID schedule( final Map<String, Object> config ) throws Exception {
if ( config == null ) {
logger.error( "export information cannot be null" );
return null;
}
EntityManager em;
try {
em = emf.getEntityManager( emf.getManagementAppId() );
Set<String> collections = em.getApplicationCollections();
if ( !collections.contains( "exports" ) ) {
em.createApplicationCollection( "exports" );
}
}
catch ( Exception e ) {
logger.error( "application doesn't exist within the current context" );
return null;
}
Export export = new Export();
//update state
try {
export = em.create( export );
}
catch ( Exception e ) {
logger.error( "Export entity creation failed" );
return null;
}
export.setState( Export.State.CREATED );
em.update( export );
//set data to be transferred to exportInfo
JobData jobData = new JobData();
jobData.setProperty( "exportInfo", config );
jobData.setProperty( EXPORT_ID, export.getUuid() );
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
//schedule job
sch.createJob( EXPORT_JOB_NAME, soonestPossible, jobData );
//update state
export.setState( Export.State.SCHEDULED );
em.update( export );
return export.getUuid();
}
/**
* Query Entity Manager for the string state of the Export Entity. This corresponds to the GET /export
*
* @return String
*/
@Override
public String getState( final UUID uuid ) throws Exception {
if ( uuid == null ) {
logger.error( "UUID passed in cannot be null." );
return "UUID passed in cannot be null";
}
EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
//retrieve the export entity.
Export export = rootEm.get( uuid, Export.class );
if ( export == null ) {
logger.error( "no entity with that uuid was found" );
return "No Such Element found";
}
return export.getState().toString();
}
@Override
public String getErrorMessage( final UUID appId, final UUID uuid ) throws Exception {
//get application entity manager
if ( appId == null ) {
logger.error( "Application context cannot be found." );
return "Application context cannot be found.";
}
if ( uuid == null ) {
logger.error( "UUID passed in cannot be null." );
return "UUID passed in cannot be null";
}
EntityManager rootEm = emf.getEntityManager( appId );
//retrieve the export entity.
Export export = rootEm.get( uuid, Export.class );
if ( export == null ) {
logger.error( "no entity with that uuid was found" );
return "No Such Element found";
}
return export.getErrorMessage();
}
@Override
public void doExport( final JobExecution jobExecution ) throws Exception {
@SuppressWarnings("unchecked")
Map<String, Object> config = ( Map<String, Object> ) jobExecution.getJobData().getProperty( "exportInfo" );
Object s3PlaceHolder = jobExecution.getJobData().getProperty( "s3Export" );
S3Export s3Export;
if ( config == null ) {
logger.error( "Export Information passed through is null" );
return;
}
//get the entity manager for the application, and the entity that this Export corresponds to.
UUID exportId = ( UUID ) jobExecution.getJobData().getProperty( EXPORT_ID );
EntityManager em = emf.getEntityManager( emf.getManagementAppId() );
Export export = em.get( exportId, Export.class );
//update the entity state to show that the job has officially started.
export.setState( Export.State.STARTED );
em.update( export );
try {
if ( s3PlaceHolder != null ) {
s3Export = ( S3Export ) s3PlaceHolder;
}
else {
s3Export = new S3ExportImpl();
}
}
catch ( Exception e ) {
logger.error( "S3Export doesn't exist" );
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
if ( config.get( "organizationId" ) == null ) {
logger.error( "doExport: No organization could be found" );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
else if ( config.get( "applicationId" ) == null ) {
//exports All the applications from an organization
try {
exportApplicationsFromOrg( ( UUID ) config.get( "organizationId" ), config, jobExecution, s3Export );
}
catch ( Exception e ) {
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
}
else if ( config.get( "collectionName" ) == null ) {
//exports an Application from a single organization
try {
exportApplicationFromOrg( ( UUID ) config.get( "organizationId" ),
( UUID ) config.get( "applicationId" ), config, jobExecution, s3Export );
}
catch ( Exception e ) {
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
}
else {
try {
//exports a single collection from an app org combo
try {
exportCollectionFromOrgApp( ( UUID ) config.get( "applicationId" ), config, jobExecution,
s3Export );
}
catch ( Exception e ) {
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
}
catch ( Exception e ) {
//if for any reason the backing up fails, then update the entity with a failed state.
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
em.update( export );
return;
}
}
export.setState( Export.State.FINISHED );
em.update( export );
}
public SchedulerService getSch() {
return sch;
}
public void setSch( final SchedulerService sch ) {
this.sch = sch;
}
public EntityManagerFactory getEmf() {
return emf;
}
public void setEmf( final EntityManagerFactory emf ) {
this.emf = emf;
}
public ManagementService getManagementService() {
return managementService;
}
public void setManagementService( final ManagementService managementService ) {
this.managementService = managementService;
}
public Export getExportEntity( final JobExecution jobExecution ) throws Exception {
UUID exportId = ( UUID ) jobExecution.getJobData().getProperty( EXPORT_ID );
EntityManager exportManager = emf.getEntityManager( emf.getManagementAppId() );
return exportManager.get( exportId, Export.class );
}
/**
* Exports All Applications from an Organization
*/
private void exportApplicationsFromOrg( UUID organizationUUID, final Map<String, Object> config,
final JobExecution jobExecution, S3Export s3Export ) throws Exception {
//retrieves export entity
Export export = getExportEntity( jobExecution );
String appFileName = null;
BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organizationUUID, true);
for ( Map.Entry<UUID, String> application : applications.entrySet() ) {
if ( application.getValue().equals(
managementService.getOrganizationByUuid( organizationUUID ).getName() + "/exports" ) ) {
continue;
}
appFileName = prepareOutputFileName( application.getValue(), null );
File ephemeral = collectionExportAndQuery( application.getKey(), config, export, jobExecution );
fileTransfer( export, appFileName, ephemeral, config, s3Export );
}
}
public void fileTransfer( Export export, String appFileName, File ephemeral, Map<String, Object> config,
S3Export s3Export ) {
try {
s3Export.copyToS3( ephemeral, config, appFileName );
}
catch ( Exception e ) {
export.setErrorMessage( e.getMessage() );
export.setState( Export.State.FAILED );
return;
}
}
/**
* Exports a specific applications from an organization
*/
private void exportApplicationFromOrg( UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
final JobExecution jobExecution, S3Export s3Export ) throws Exception {
//retrieves export entity
Export export = getExportEntity( jobExecution );
ApplicationInfo application = managementService.getApplicationInfo( applicationId );
String appFileName = prepareOutputFileName( application.getName(), null );
File ephemeral = collectionExportAndQuery(applicationId, config, export, jobExecution);
fileTransfer( export, appFileName, ephemeral, config, s3Export );
}
/**
* Exports a specific collection from an org-app combo.
*/
//might be confusing, but uses the /s/ inclusion or exclusion nomenclature.
private void exportCollectionFromOrgApp( UUID applicationUUID, final Map<String, Object> config,
final JobExecution jobExecution, S3Export s3Export ) throws Exception {
//retrieves export entity
Export export = getExportEntity( jobExecution );
ApplicationInfo application = managementService.getApplicationInfo( applicationUUID );
String appFileName = prepareOutputFileName( application.getName(), ( String ) config.get( "collectionName" ) );
File ephemeral = collectionExportAndQuery( applicationUUID, config, export, jobExecution );
fileTransfer( export, appFileName, ephemeral, config, s3Export );
}
/**
* Regulates how long to wait until the next heartbeat.
*/
public long checkTimeDelta( long startingTime, final JobExecution jobExecution ) {
long cur_time = System.currentTimeMillis();
if ( startingTime <= ( cur_time - TIMESTAMP_DELTA ) ) {
jobExecution.heartbeat();
return cur_time;
}
return startingTime;
}
/**
* Serialize and save the collection members of this <code>entity</code>
*
* @param em Entity Manager
* @param collection Collection Name
* @param entity entity
*/
private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String collection, Entity entity )
throws Exception {
// Write connections
saveConnections( entity, em, jg );
// Write dictionaries
saveDictionaries( entity, em, jg );
Set<String> collections = em.getCollections( entity );
// If your application doesn't have any e
if ( ( collections == null ) || collections.isEmpty() ) {
return;
}
for ( String collectionName : collections ) {
if ( collectionName.equals( collection ) ) {
jg.writeFieldName( collectionName );
jg.writeStartArray();
//is 100000 an arbitary number?
Results collectionMembers =
em.getCollection( entity, collectionName, null, 100000, Level.IDS, false );
List<UUID> entityIds = collectionMembers.getIds();
if ( ( entityIds != null ) && !entityIds.isEmpty() ) {
for ( UUID childEntityUUID : entityIds ) {
jg.writeObject( childEntityUUID.toString() );
}
}
// End collection array.
jg.writeEndArray();
}
}
}
/**
* Persists the connection for this entity.
*/
private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
jg.writeFieldName( "dictionaries" );
jg.writeStartObject();
Set<String> dictionaries = em.getDictionaries( entity );
for ( String dictionary : dictionaries ) {
Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
// nothing to do
if ( dict.isEmpty() ) {
continue;
}
jg.writeFieldName( dictionary );
jg.writeStartObject();
for ( Map.Entry<Object, Object> entry : dict.entrySet() ) {
jg.writeFieldName( entry.getKey().toString() );
jg.writeObject( entry.getValue() );
}
jg.writeEndObject();
}
jg.writeEndObject();
}
/**
* Persists the connection for this entity.
*/
private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
jg.writeFieldName( "connections" );
jg.writeStartObject();
Set<String> connectionTypes = em.getConnectionTypes( entity );
for ( String connectionType : connectionTypes ) {
jg.writeFieldName( connectionType );
jg.writeStartArray();
Results results = em.getTargetEntities(
new SimpleEntityRef(entity.getType(), entity.getUuid()),
connectionType, null, Level.IDS);
List<ConnectionRef> connections = results.getConnections();
for ( ConnectionRef connectionRef : connections ) {
jg.writeObject( connectionRef.getTargetRefs().getUuid() );
}
jg.writeEndArray();
}
jg.writeEndObject();
}
protected JsonGenerator getJsonGenerator( File ephermal ) throws IOException {
//TODO:shouldn't the below be UTF-16?
JsonGenerator jg = jsonFactory.createJsonGenerator( ephermal, JsonEncoding.UTF8 );
jg.setPrettyPrinter( new DefaultPrettyPrinter( ) );
jg.setCodec( new ObjectMapper() );
return jg;
}
/**
* @return the file name concatenated with the type and the name of the collection
*/
public String prepareOutputFileName( String applicationName, String CollectionName ) {
StringBuilder str = new StringBuilder();
str.append( applicationName );
str.append( "." );
if ( CollectionName != null ) {
str.append( CollectionName );
str.append( "." );
}
str.append( System.currentTimeMillis() );
str.append( ".json" );
String outputFileName = str.toString();
return outputFileName;
}
/**
* handles the query and export of collections
*/
//TODO:Needs further refactoring.
protected File collectionExportAndQuery( UUID applicationUUID, final Map<String, Object> config, Export export,
final JobExecution jobExecution ) throws Exception {
EntityManager em = emf.getEntityManager( applicationUUID );
Map<String, Object> metadata = em.getApplicationCollectionMetadata();
long starting_time = System.currentTimeMillis();
File ephemeral = new File( "tempExport" + UUID.randomUUID() );
ephemeral.deleteOnExit();
JsonGenerator jg = getJsonGenerator( ephemeral );
jg.writeStartObject();
jg.writeObjectFieldStart( "collections" );
for ( String collectionName : metadata.keySet() ) {
if ( collectionName.equals( "exports" ) ) {
continue;
}
//if the collection you are looping through doesn't match the name of the one you want. Don't export it.
if ( ( config.get( "collectionName" ) == null ) || collectionName.equalsIgnoreCase((String)config.get( "collectionName" ) ) ) {
//write out the collection name at the start of the file
jg.writeArrayFieldStart( collectionName.toLowerCase() );
//Query entity manager for the entities in a collection
Query query = null;
if ( config.get( "query" ) == null ) {
query = new Query();
}
else {
try {
query = Query.fromQL( ( String ) config.get( "query" ) );
}
catch ( Exception e ) {
export.setErrorMessage( e.getMessage() );
}
}
query.setLimit( MAX_ENTITY_FETCH );
query.setResultsLevel( Level.ALL_PROPERTIES );
query.setCollection( collectionName );
Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
//pages through the query and backs up all results.
PagingResultsIterator itr = new PagingResultsIterator( entities );
for ( Object e : itr ) {
starting_time = checkTimeDelta( starting_time, jobExecution );
Entity entity = ( Entity ) e;
jg.writeStartObject();
jg.writeFieldName( "Metadata" );
jg.writeObject( entity );
saveCollectionMembers( jg, em, ( String ) config.get( "collectionName" ), entity );
jg.writeEndObject();
jg.flush();
}
//write out the end collection
jg.writeEndArray();
}
}
jg.writeEndObject();
jg.writeEndObject();
jg.flush();
jg.close();
return ephemeral;
}
}