Merge commit 'refs/pull/596/head' of github.com:apache/usergrid
diff --git a/stack/README.md b/stack/README.md
index 268cb7e..6166da2 100644
--- a/stack/README.md
+++ b/stack/README.md
@@ -3,7 +3,7 @@
A highly-scalable data platform for mobile applications.
* **Documentation**: http://usergrid.apache.org/docs/
-* **Homepage**: http://http://usergrid.apache.org/
+* **Homepage**: http://usergrid.apache.org/
## Requirements
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 909c073..841e978 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -181,6 +181,8 @@
bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
bind( StatusService.class ).to( StatusServiceImpl.class );
+
+ bind(ApplicationRestorePasswordService.class).to(ApplicationRestorePasswordServiceImpl.class);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7a4c781..0681b7a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -638,7 +638,9 @@
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
- if ( !skipIndexingForType( entityId.getType() ) ) {
+ // may still want to delete index entries even if indexing is turned off for new updates
+ if ( entityManagerFig.deindexDeletedWhenCollectionIndexingOff() ||
+ !skipIndexingForType( entityId.getType() ) ) {
indexService.queueEntityDelete( applicationScope, entityId );
}
@@ -1244,8 +1246,15 @@
@Override
public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
+ return getDictionaryAsMap(entity, dictionaryName, true);
+ }
- entity = validate( entity );
+
+ @Override
+ public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName,
+ boolean forceVerification) throws Exception {
+
+ entity = validate( entity, forceVerification);
Map<Object, Object> dictionary = new LinkedHashMap<Object, Object>();
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 3c8a53f..23cf1c3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -42,6 +42,10 @@
@Default( "false" )
boolean getDeindexOnUpdate();
+ @Key( "usergrid.entityManager.deindex_deleted_when_collection_indexing_off")
+ @Default( "true" )
+ boolean deindexDeletedWhenCollectionIndexingOff();
+
/**
* Comma-separated list of one or more Amazon regions to use if multiregion
* is set to true.
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 1eb5e03..dbec084 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -301,14 +301,14 @@
return indexService.deIndexOldVersions( applicationScope, entityId,
- getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
+ getVersionsOlderThanMarked(ecm, entityId, markedVersion));
}
- private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
- final Id entityId, final UUID markedVersion ){
+ private List<UUID> getVersionsOlderThanMarked(final EntityCollectionManager ecm, final Id entityId,
+ final UUID markedVersion ){
final List<UUID> versions = new ArrayList<>();
@@ -317,7 +317,7 @@
ecm.getVersionsFromMaxToMin( entityId, markedVersion)
.take(100)
.forEach( mvccLogEntry -> {
- if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
+ if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
versions.add(mvccLogEntry.getVersion());
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 32470f6..b1d493e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -199,6 +199,11 @@
Object fields = jsonMapData.get("fields");
+ // if "fields" field doesn't exist, should treat like fields=all
+ if ( fields == null ) {
+ return Optional.absent();
+ }
+
if ( fields != null && fields instanceof String && "all".equalsIgnoreCase(fields.toString())) {
return Optional.absent();
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b9238e5..48c3908 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.utils.StringUtils;
+
/**
* An interface for re-indexing all entities in an application
*/
@@ -47,6 +49,13 @@
*/
ReIndexStatus getStatus( final String jobId );
+ /**
+ * Get the status of a collection job
+ * @param collectionName The collectionName for the rebuild index
+ * @return
+ */
+ ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );
+
/**
* The response when requesting a re-index operation
@@ -56,14 +65,27 @@
final Status status;
final long numberProcessed;
final long lastUpdated;
+ final String collectionName;
public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
- final long lastUpdated ) {
- this.jobId = jobId;
+ final long lastUpdated, final String collectionName ) {
+
+ if(StringUtils.isNotEmpty(jobId)){
+ this.jobId = jobId;
+ }else {
+ this.jobId = "";
+ }
+
this.status = status;
this.numberProcessed = numberProcessed;
this.lastUpdated = lastUpdated;
+
+ if(StringUtils.isNotEmpty(collectionName)){
+ this.collectionName = collectionName;
+ }else {
+ this.collectionName = "";
+ }
}
@@ -74,6 +96,13 @@
return jobId;
}
+ /**
+ * Get the collectionName used to resume this operation
+ */
+ public String getCollectionName() {
+ return collectionName;
+ }
+
/**
* Get the last updated time, as a long
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 05602fc..036f89c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -75,6 +75,7 @@
private static final String MAP_COUNT_KEY = "count";
private static final String MAP_STATUS_KEY = "status";
private static final String MAP_UPDATED_KEY = "lastUpdated";
+ private static final String MAP_SEPARATOR = "|||";
private final AllApplicationsObservable allApplicationsObservable;
@@ -140,7 +141,9 @@
// create an observable that loads a batch to be indexed
- if(reIndexRequestBuilder.getCollectionName().isPresent()) {
+ final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();
+
+ if(isForCollection) {
String collectionName = InflectionUtils.pluralize(
CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
@@ -175,12 +178,36 @@
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
}
- writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
- .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ if( isForCollection ){
+ writeStateMetaForCollection(
+ appId.get().getApplication().getUuid().toString(),
+ reIndexRequestBuilder.getCollectionName().get(),
+ Status.INPROGRESS, count.get(),
+ System.currentTimeMillis() );
+ }else{
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
+ }
+ })
+ .doOnCompleted(() ->{
+ if( isForCollection ){
+ writeStateMetaForCollection(
+ appId.get().getApplication().getUuid().toString(),
+ reIndexRequestBuilder.getCollectionName().get(),
+ Status.COMPLETE, count.get(),
+ System.currentTimeMillis() );
+ }else {
+ writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
+ }
+ })
.subscribeOn( Schedulers.io() ).subscribe();
+ if(isForCollection){
+ return new ReIndexStatus( "", Status.STARTED, 0, 0, CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get()) );
- return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
+ }
+
+
+ return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
}
@@ -196,38 +223,15 @@
return getIndexResponse( jobId );
}
-
- /**
- * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
- */
- private class FlushingCollector {
-
- private final String jobId;
- private long count;
-
-
- private FlushingCollector( final String jobId ) {
- this.jobId = jobId;
- }
-
-
- public void flushBuffer( final List<EdgeScope> buffer ) {
- count += buffer.size();
-
- //write our cursor state
- if ( buffer.size() > 0 ) {
- writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
- }
-
- writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
- }
-
- public void complete(){
- writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
- }
+ @Override
+ public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
+ Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
+ Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
+ return getIndexResponseForCollection( appIdString, collectionName );
}
+
/**
* Get the resume edge scope
*
@@ -346,7 +350,7 @@
final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
if(stringStatus == null){
- return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
+ return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
}
final Status status = Status.valueOf( stringStatus );
@@ -354,7 +358,41 @@
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
- return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
+ return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
+ }
+
+
+ private void writeStateMetaForCollection(final String appIdString, final String collectionName,
+ final Status status, final long processedCount, final long lastUpdated ) {
+
+ String prefixedColName = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() );
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
+ collectionName, status, processedCount, lastUpdated);
+ }
+
+ mapManager.putString( appIdString + MAP_SEPARATOR + prefixedColName + MAP_STATUS_KEY, status.name() );
+ mapManager.putLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {
+
+ String prefixedColName = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() );
+ final String stringStatus =
+ mapManager.getString( appIdString + MAP_SEPARATOR + prefixedColName + MAP_STATUS_KEY );
+
+ if(stringStatus == null){
+ return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
+ }
+
+ final Status status = Status.valueOf( stringStatus );
+
+ final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + prefixedColName + MAP_UPDATED_KEY );
+
+ return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 88b5001..7aa614a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -28,6 +28,10 @@
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
+import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
+
+import static org.apache.usergrid.persistence.Query.MAX_LIMIT;
/**
@@ -43,16 +47,24 @@
// it can happen if ES was not updated or has yet to be updated.
private final boolean keepStaleEntries;
private String query;
+ private ParsedQuery parsedQuery;
public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id, boolean keepStaleEntries, String query ) {
this.applicationScope = applicationScope;
- this.requestCursor = requestCursor;
- this.limit = limit;
this.id = id;
this.keepStaleEntries = keepStaleEntries;
this.query = query;
+ this.parsedQuery = ParsedQueryBuilder.build(query);
+ if (parsedQuery != null && parsedQuery.isDirectQuery()) {
+ // for direct query, use no limit or cursor
+ this.limit = MAX_LIMIT + 1;
+ this.requestCursor = new RequestCursor(Optional.absent());
+ } else {
+ this.limit = limit;
+ this.requestCursor = requestCursor;
+ }
}
@@ -98,4 +110,8 @@
return query;
}
+ public ParsedQuery getParsedQuery() {
+ return parsedQuery;
+ }
+
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
index 7a46507..cc409b0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -129,7 +129,7 @@
try {
final CandidateResults candidateResults =
- applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet,
+ applicationEntityIndex.search( searchEdge, searchTypes, pipelineContext.getParsedQuery(), limit, currentOffSet,
propertiesWithType, analyzeOnly, returnQuery);
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 20bcfe9..c0db02f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -21,13 +21,18 @@
import java.util.*;
+import java.util.logging.Filter;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.DistanceField;
import org.apache.usergrid.persistence.model.field.EntityObjectField;
import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import org.apache.usergrid.utils.DateUtils;
import org.slf4j.Logger;
@@ -99,6 +104,8 @@
boolean keepStaleEntries = pipelineContext.getKeepStaleEntries();
String query = pipelineContext.getQuery();
+ ParsedQuery parsedQuery = pipelineContext.getParsedQuery();
+ boolean isDirectQuery = parsedQuery == null ? false : parsedQuery.isDirectQuery();
//buffer them to get a page size we can make 1 network hop
final Observable<FilterResult<Entity>> searchIdSetObservable =
@@ -107,7 +114,12 @@
//load them
.flatMap( candidateResults -> {
- //flatten toa list of ids to load
+ if (isDirectQuery) {
+ // add ids for direct query
+ updateDirectQueryCandidateResults(entityCollectionManager, candidateResults);
+ }
+
+ //flatten to a list of ids to load
final Observable<List<Candidate>> candidates =
Observable.from(candidateResults)
.map(filterResultCandidate -> filterResultCandidate.getValue()).toList();
@@ -118,12 +130,13 @@
Observable<EntitySet> entitySets = Observable.from(candidatesList)
.map(candidateEntry -> candidateEntry.getCandidateResult().getId()).toList()
.flatMap(idList -> entityCollectionManager.load(idList));
- //now we have a collection, validate our canidate set is correct.
+ //now we have a collection, validate our candidate set is correct.
return entitySets.map(
entitySet -> new EntityVerifier(
applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
)
- .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query))
+ .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query,
+ isDirectQuery))
.flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
.map(entityFilterResult -> {
final Entity entity = entityFilterResult.getValue();
@@ -222,6 +235,28 @@
/**
+ * Update direct query candidates to add IDs.
+ */
+ private void updateDirectQueryCandidateResults(
+ EntityCollectionManager entityCollectionManager, List<FilterResult<Candidate>> candidatesList) {
+ for (FilterResult<Candidate> filterCandidate : candidatesList) {
+ Candidate candidate = filterCandidate.getValue();
+ CandidateResult candidateResult = candidate.getCandidateResult();
+ String entityType = candidateResult.getDirectEntityType();
+ Id entityId = null;
+ if (candidateResult.isDirectQueryName()) {
+ entityId = entityCollectionManager.getIdField( entityType,
+ new StringField( Schema.PROPERTY_NAME, candidateResult.getDirectEntityName() ) )
+ .toBlocking() .lastOrDefault( null );
+ } else if (candidateResult.isDirectQueryUUID()) {
+ entityId = new SimpleId(candidateResult.getDirectEntityUUID(), entityType);
+ }
+ filterCandidate.getValue().getCandidateResult().setId(entityId);
+ }
+ }
+
+
+ /**
* Our collector to collect entities. Not quite a true collector, but works within our operational
* flow as this state is mutable and difficult to represent functionally
*/
@@ -234,6 +269,7 @@
private final List<FilterResult<Candidate>> candidateResults;
private final IndexProducer indexProducer;
private final EntitySet entitySet;
+ private List<FilterResult<Candidate>> dedupedCandidateResults = new ArrayList<>();
public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
@@ -250,14 +286,33 @@
/**
* Merge our candidates and our entity set into results
*/
- public void merge(boolean keepStaleEntries, String query) {
+ public void merge(boolean keepStaleEntries, String query, boolean isDirectQuery) {
- for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
- validate( candidateResult , keepStaleEntries, query);
+ if (!isDirectQuery) {
+ filterDuplicateCandidates(query);
+ } else {
+ // remove direct query duplicates or missing entities (names that don't exist will have null ids)
+ Set<UUID> foundUUIDs = new HashSet<>();
+ for (FilterResult<Candidate> candidateFilterResult : candidateResults) {
+ Id id = candidateFilterResult.getValue().getCandidateResult().getId();
+ if (id != null) {
+ UUID uuid = id.getUuid();
+ if (!foundUUIDs.contains(uuid)) {
+ dedupedCandidateResults.add(candidateFilterResult);
+ foundUUIDs.add(uuid);
+ }
+ }
+ }
}
- indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
+ for (final FilterResult<Candidate> candidateResult : dedupedCandidateResults) {
+ validate(candidateResult, keepStaleEntries, query, isDirectQuery);
+ }
+ // no index requests made for direct query, so no need to modify index
+ if (!isDirectQuery) {
+ indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
+ }
}
@@ -287,14 +342,78 @@
}
- private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) {
+ // don't need to worry about whether we are keeping stale entries -- this will remove candidates that are
+ // older than others in the result set
+ private void filterDuplicateCandidates(String query) {
+
+ Map<Id, UUID> latestEntityVersions = new HashMap<>();
+
+ // walk through candidates and find latest version for each entityID
+ for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+ final Candidate candidate = filterResult.getValue();
+ final CandidateResult candidateResult = candidate.getCandidateResult();
+ final Id candidateId = candidateResult.getId();
+ final UUID candidateVersion = candidateResult.getVersion();
+
+ UUID previousCandidateVersion = latestEntityVersions.get(candidateId);
+ if (previousCandidateVersion != null) {
+ // replace if newer
+ if (UUIDComparator.staticCompare(candidateVersion, previousCandidateVersion) > 0) {
+ latestEntityVersions.put(candidateId, candidateVersion);
+ }
+ } else {
+ latestEntityVersions.put(candidateId, candidateVersion);
+ }
+ }
+
+ // walk through candidates again, saving newest results and deindexing older
+ for ( final FilterResult<Candidate> filterResult : candidateResults ) {
+ final Candidate candidate = filterResult.getValue();
+ final CandidateResult candidateResult = candidate.getCandidateResult();
+ final Id candidateId = candidateResult.getId();
+ final UUID candidateVersion = candidateResult.getVersion();
+
+ final UUID latestCandidateVersion = latestEntityVersions.get(candidateId);
+
+ if (candidateVersion.equals(latestCandidateVersion)) {
+ // save candidate
+ dedupedCandidateResults.add(filterResult);
+ } else {
+ // deindex if not the current version in database
+ final MvccEntity entity = entitySet.getEntity( candidateId );
+ final UUID databaseVersion = entity.getVersion();
+
+ if (!candidateVersion.equals(databaseVersion)) {
+ Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
+ Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
+
+ logger.warn( "Found old stale entity on edge {} for entityId {} Entity version {} ({}). Candidate version {} ({}). Will not be returned in result set. Query = [{}]",
+ candidate.getSearchEdge(),
+ entity.getId().getUuid(),
+ databaseVersion,
+ DateUtils.instance.formatIso8601Date(entityTimeStamp),
+ candidateVersion,
+ DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+ query
+ );
+
+ final SearchEdge searchEdge = candidate.getSearchEdge();
+ batch.deindex(searchEdge, entity.getId(), candidateVersion);
+ }
+ }
+ }
+ }
+
+
+ private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query,
+ boolean isDirectQuery) {
final Candidate candidate = filterResult.getValue();
final CandidateResult candidateResult = candidate.getCandidateResult();
final boolean isGeo = candidateResult instanceof GeoCandidateResult;
final SearchEdge searchEdge = candidate.getSearchEdge();
final Id candidateId = candidateResult.getId();
- final UUID candidateVersion = candidateResult.getVersion();
+ UUID candidateVersion = candidateResult.getVersion();
final MvccEntity entity = entitySet.getEntity( candidateId );
@@ -302,9 +421,14 @@
//doesn't exist warn and drop
if ( entity == null ) {
- logger.warn(
- "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra. Ignoring since this could be a region sync issue",
- candidateId, candidateVersion );
+ if (!isDirectQuery) {
+ logger.warn(
+ "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra. Ignoring since this could be a region sync issue",
+ candidateId, candidateVersion);
+ } else {
+ logger.warn(
+ "Direct query for entityId {} was not found in cassandra, query=[{}]", candidateId, query);
+ }
//TODO trigger an audit after a fail count where we explicitly try to repair from other regions
@@ -314,29 +438,42 @@
}
- final UUID entityVersion = entity.getVersion();
+ final UUID databaseVersion = entity.getVersion();
+ if (isDirectQuery) {
+ // use returned (latest) version for direct query
+ candidateVersion = databaseVersion;
+ }
final Id entityId = entity.getId();
- //entity is newer than ES version, could be a missed or slow index event
- if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) {
+ // The entity is marked as deleted
+ if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
+
+ // when updating entities, we don't delete all previous versions from ES so this action is expected
+ if(logger.isDebugEnabled()){
+ logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
+ searchEdge, entityId, databaseVersion);
+ }
+
+ batch.deindex( searchEdge, entityId, candidateVersion );
+ return;
+ }
+
+ // entity exists and is newer than ES version, could be a missed or slow index event
+ if ( UUIDComparator.staticCompare(databaseVersion, candidateVersion) > 0 ) {
Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
- Date entityTimeStamp = UUIDTimeStampToDate(entityVersion);
+ Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
- Map<String,String> fields = new HashMap<>();
- for (Field field : entity.getEntity().get().getFields()) {
- fields.put(field.getName(),String.valueOf(field.getValue()));
- }
-
- logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}. Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}",
- searchEdge,
- entityId.getUuid(),
- DateUtils.instance.formatIso8601Date(entityTimeStamp),
- DateUtils.instance.formatIso8601Date(candidateTimeStamp),
- keepStaleEntries,
- query,
- fields
- );
+ logger.warn( "Found stale entity on edge {} for entityId {} Entity version {} ({}). Candidate version {} ({}). Will be returned in result set = {} Query = [{}]",
+ candidate.getSearchEdge(),
+ entity.getId().getUuid(),
+ databaseVersion,
+ DateUtils.instance.formatIso8601Date(entityTimeStamp),
+ candidateVersion,
+ DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+ keepStaleEntries,
+ query
+ );
if (!keepStaleEntries) {
batch.deindex(searchEdge, entityId, candidateVersion);
@@ -344,34 +481,20 @@
}
}
-
- // The entity is marked as deleted
- if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
-
- // when updating entities, we don't delete previous versions from ES so this action is expected
- if(logger.isDebugEnabled()){
- logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
- searchEdge, entityId, entityVersion);
- }
-
- batch.deindex( searchEdge, entityId, candidateVersion );
- return;
- }
-
//ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
//remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+ if ( UUIDComparator.staticCompare( candidateVersion, databaseVersion ) > 0 ) {
logger.warn(
"Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair should be run",
- searchEdge, entityId, entityVersion);
+ searchEdge, entityId, databaseVersion);
//TODO trigger an audit after a fail count where we explicitly try to repair from other regions
return;
}
- //they're the same add it
+ // add the result
final Entity returnEntity = entity.getEntity().get();
if(isGeo){
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java
new file mode 100644
index 0000000..99ef57d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordService.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence.service;
+
+import java.util.UUID;
+
+public interface ApplicationRestorePasswordService {
+ String getApplicationRestorePassword(final UUID applicationId);
+ void setApplicationRestorePassword(final UUID applicationId, final String restorePassword);
+ void removeApplicationRestorePassword(final UUID applicationId);
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java
new file mode 100644
index 0000000..e56bba7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationRestorePasswordServiceImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence.service;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.UUID;
+
+public class ApplicationRestorePasswordServiceImpl implements ApplicationRestorePasswordService {
+ private final MapManagerFactory mapManagerFactory;
+ final static String restorePasswordNamespace = "appRestorePassword";
+ final static String passwordKey = "password";
+
+ @Inject
+ public ApplicationRestorePasswordServiceImpl(final MapManagerFactory mapManagerFactory) {
+ this.mapManagerFactory = mapManagerFactory;
+ }
+
+ private MapManager getMapManager(final UUID applicationId) {
+ final Id appId = CpNamingUtils.generateApplicationId(applicationId);
+ return mapManagerFactory.createMapManager(new MapScopeImpl(appId, restorePasswordNamespace));
+ }
+
+ @Override
+ public String getApplicationRestorePassword(final UUID applicationId) {
+ Preconditions.checkNotNull(applicationId, "app id is null");
+
+ MapManager mapManager = getMapManager(applicationId);
+ return mapManager.getString(passwordKey);
+
+ }
+
+ @Override
+ public void setApplicationRestorePassword(final UUID applicationId, final String restorePassword) {
+ Preconditions.checkNotNull(applicationId, "app id is null");
+ Preconditions.checkArgument(!StringUtils.isEmpty(restorePassword), "restorePassword is empty");
+
+ MapManager mapManager = getMapManager(applicationId);
+ mapManager.putString(passwordKey, restorePassword);
+
+ }
+
+ @Override
+ public void removeApplicationRestorePassword(final UUID applicationId) {
+ Preconditions.checkNotNull(applicationId, "app id is null");
+
+ MapManager mapManager = getMapManager(applicationId);
+ mapManager.delete(passwordKey);
+ }
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 259e4b9..c476f1f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -84,8 +84,11 @@
final Optional<String> query = search.getQuery();
final IdBuilder pipelineBuilder =
- pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
- .withLimit( search.getLimit() ).fromId( search.getSourceNodeId() );
+ pipelineBuilderFactory.create( search.getApplicationScope() )
+ .withCursor( search.getCursor() )
+ .withLimit( search.getLimit() )
+ .query(query)
+ .fromId( search.getSourceNodeId() );
//we want to load all entities
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index c9752c3..c0e64a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -276,6 +276,10 @@
public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName )
throws Exception;
+ public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName,
+ boolean forceVerification )
+ throws Exception;
+
public Object getDictionaryElementValue( EntityRef entityRef, String dictionaryName,
String elementName ) throws Exception;
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index c5833af..51b20c6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -23,10 +23,14 @@
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.utils.InflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PathQuery<E> {
+ private static final Logger logger = LoggerFactory.getLogger( PathQuery.class );
+
private PathQuery source;
private Query query;
private UUID uuid;
@@ -122,10 +126,30 @@
EntityRef ref = new SimpleEntityRef(type,uuid);
- // if it's a single name identifier, just directly fetch that
- if ( !query.getQl().isPresent() && query.getSingleNameOrEmailIdentifier() != null){
+ if ( !query.getQl().isPresent() && query.getSingleUuidIdentifier() != null) {
+ // fetch entity by UUID directly
+ UUID entityUUID = query.getSingleUuidIdentifier();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("getHeadResults(): identified path uuid: appid={}, collection={}, entityUUID={}",
+ uuid.toString(), query.getCollection(), entityUUID.toString());
+ }
+
+ String entityType = InflectionUtils.singularize(query.getCollection());
+
+ return em.getEntities(Collections.singletonList(entityUUID), entityType);
+
+ } else if ( !query.getQl().isPresent() && query.getSingleNameOrEmailIdentifier() != null){
+
+ // if it's a single name identifier, just directly fetch that
String name = query.getSingleNameOrEmailIdentifier();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("getHeadResults(): identified path name/email: appid={}, collection={}, name={}",
+ uuid.toString(), query.getCollection(), name);
+ }
+
String entityType = InflectionUtils.singularize(query.getCollection());
// don't use unique index repair on read only logic
@@ -140,6 +164,12 @@
return em.getEntities(Collections.singletonList(entityId), entityType);
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("getHeadResults(): sending query to ES: appid={}, collection={}, query=[{}]",
+ uuid.toString(), query.getCollection(),
+ query.getQl().isPresent() ? query.getQl().get() : "NONE");
+ }
+
return ( query.getCollection() != null ) ?
em.searchCollection( ref, query.getCollection(), query ) :
em.searchTargetEntities(ref, query);
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
index 9e5598c..c22a627 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -17,31 +17,31 @@
package org.apache.usergrid.persistence;
-import com.codahale.metrics.MetricRegistry;
-import com.google.inject.Injector;
-import net.jcip.annotations.NotThreadSafe;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.index.*;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-
-import static org.junit.Assert.*;
+import com.codahale.metrics.MetricRegistry;
-@NotThreadSafe
+
public class CollectionDeleteTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
@@ -199,9 +199,10 @@
private int retryReadData(EntityManager em, String collectionName, int expectedEntities, int retry) throws Exception {
int count = -1;
+ Set<Entity> uniqueRemEnts = new HashSet<Entity>();
do {
try {
- count = readData(em, collectionName, expectedEntities);
+ count = readData(em, collectionName, expectedEntities, uniqueRemEnts);
} catch (Exception ignore) {
logger.info( "caught exception ", ignore);
}
@@ -211,7 +212,7 @@
return count;
}
- private int readData(EntityManager em, String collectionName, int expectedEntities)
+ private int readData(EntityManager em, String collectionName, int expectedEntities, Set<Entity> uniqueRemEnts)
throws Exception {
app.waitForQueueDrainAndRefreshIndex();
@@ -219,45 +220,44 @@
Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
Query.Level.ALL_PROPERTIES, false);
- int count = 0;
- List<Entity> list = new ArrayList<>();
+
while ( true ) {
if (results.getEntities().size() == 0) {
break;
}
+
UUID lastEntityUUID = null;
for ( Entity e : results.getEntities() ) {
assertEquals(2000, e.getProperty("key2"));
- if (count % 100 == 0) {
- logger.info("read {} entities", count);
+ if (uniqueRemEnts.size() % 100 == 0) {
+ logger.info("read {} entities", uniqueRemEnts.size());
}
lastEntityUUID = e.getUuid();
- count++;
- list.add(e);
+ uniqueRemEnts.add(e);
+ logger.info("Found remaining entity {}", lastEntityUUID);
}
results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
Query.Level.ALL_PROPERTIES, false);
}
- logger.info("read {} total entities", count);
- if (count != expectedEntities) {
- logger.info("Expected {} did not match actual {}", expectedEntities, count);
- if (count < 20) {
- for (Entity e : list) {
+ if (uniqueRemEnts.size() != expectedEntities) {
+ logger.info("Expected {} did not match actual {}", expectedEntities, uniqueRemEnts.size());
+ if (uniqueRemEnts.size() < 20) {
+ for (Entity e : uniqueRemEnts) {
Object key = e.getProperty("key2");
- logger.info("Entity key {} ceated {}", key, e.getCreated());
+ logger.info("Entity key {} uuid {} created {}", key,e.getUuid(), e.getCreated());
}
}
}
- assertEquals( "Did not get expected entities", expectedEntities, count );
- return count;
+ assertEquals( "Did not get expected entities", expectedEntities, uniqueRemEnts.size() );
+ return uniqueRemEnts.size();
}
private int countEntities( EntityManager em, String collectionName, int expectedEntities)
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 57962c0..90af5ba 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -154,12 +154,12 @@
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex(builder);
- assertNotNull(status.getJobId(), "JobId is present");
+ assertNotNull(status.getCollectionName(), "Collection name is present");
logger.info("Rebuilt index");
- waitForRebuild(status, reIndexService);
+ waitForRebuild(em.getApplicationId().toString(), status.getCollectionName(), reIndexService);
//app.waitForQueueDrainAndRefreshIndex(15000);
@@ -279,7 +279,7 @@
logger.info("Rebuilt index, jobID={}", status.getJobId());
- waitForRebuild(status, reIndexService);
+ waitForRebuild(status.getJobId(), reIndexService);
logger.info("Rebuilt index");
@@ -387,7 +387,7 @@
logger.info("Rebuilt index");
- waitForRebuild(status, reIndexService);
+ waitForRebuild(status.getJobId(), reIndexService);
logger.info("Rebuilt index");
@@ -486,7 +486,7 @@
logger.info("Rebuilt index");
- waitForRebuild(status, reIndexService);
+ waitForRebuild(status.getJobId(), reIndexService);
logger.info("Rebuilt index");
@@ -505,18 +505,53 @@
/**
* Wait for the rebuild to occur
*/
- private void waitForRebuild(final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService)
+ private void waitForRebuild(final String jobId, final ReIndexService reIndexService)
throws InterruptedException, IllegalArgumentException {
- if (status != null) {
- logger.info("waitForRebuild: jobID={}", status.getJobId());
+ if (jobId != null && !jobId.trim().equals("")) {
+ logger.info("waitForRebuild: jobID={}", jobId);
} else {
- logger.info("waitForRebuild: error, status = null");
- throw new IllegalArgumentException("reindexStatus = null");
+ logger.info("waitForRebuild: error, jobId = null or empty");
+ throw new IllegalArgumentException("jobId = null or empty");
}
while (true) {
try {
- final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus(status.getJobId());
+ final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus(jobId);
+
+ if (updatedStatus == null) {
+ logger.info("waitForRebuild: updated status is null");
+ } else {
+ logger.info("waitForRebuild: status={} numberProcessed={}", updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+ if (updatedStatus.getStatus() == ReIndexService.Status.COMPLETE) {
+ break;
+ }
+ }
+ } catch (IllegalArgumentException iae) {
+ //swallow. Thrown if our job can't be found. I.E hasn't updated yet
+ }
+
+
+ Thread.sleep(1000);
+ }
+ }
+
+
+ /**
+ * Wait for the rebuild to occur
+ */
+ private void waitForRebuild(final String appId, final String collectionName, final ReIndexService reIndexService)
+ throws InterruptedException, IllegalArgumentException {
+ if (appId != null && !appId.trim().equals("") && collectionName != null && !collectionName.trim().equals("")) {
+ logger.info("waitForRebuild: appId={} collName={}", appId, collectionName);
+ } else {
+ logger.info("waitForRebuild: error, appId or collName = null or empty");
+ throw new IllegalArgumentException("appId or collName = null or empty");
+ }
+ while (true) {
+
+ try {
+ final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatusForCollection(appId, collectionName);
if (updatedStatus == null) {
logger.info("waitForRebuild: updated status is null");
diff --git a/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g b/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
index 605aed4..d75a617 100644
--- a/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
+++ b/stack/corepersistence/queryindex/src/main/antlr3/org/apache/usergrid/persistence/index/query/tree/CpQueryFilter.g
@@ -112,7 +112,7 @@
GT : '>' | 'gt';
-GTE : '>=' | 'gte';
+GTE : '>=' | 'gte';
//keywords before var ids
@@ -134,11 +134,14 @@
OF : ('O'|'o')('F'|'f');
-UUID : HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
- HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
- HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
- HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
- HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+DIRECT : ('D'|'d')('I'|'i')('R'|'r')('E'|'e')('C'|'c')('T'|'t');
+
+UUID :
+ HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
+ HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+ HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+ HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
+ HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT '-'
HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
HEX_DIGIT HEX_DIGIT HEX_DIGIT HEX_DIGIT
@@ -156,18 +159,18 @@
| '.' ('0'..'9')+ EXPONENT?
| ('0'..'9')+ EXPONENT)
;
-
+
STRING
: '\'' ( ESC_SEQ | ~('\\'|'\'') )* '\''
;
-
+
WS : (' ' | '\t' | '\n' | '\r' | '\f')+ {$channel=HIDDEN;};
-
+
@@ -208,12 +211,19 @@
-property : ID<Property>;
+property :
+ ID<Property>
+ | ASC<Property>
+ | DESC<Property>
+ | CONTAINS<Property>
+ | WITHIN<Property>
+ | DIRECT<Property>
+ ;
containsproperty : ID<ContainsProperty>;
withinproperty : ID<WithinProperty>;
-
+
booleanliteral: BOOLEAN<BooleanLiteral>;
@@ -225,70 +235,96 @@
stringliteral :
STRING<StringLiteral>;
-
+
floatliteral :
FLOAT<FloatLiteral> ;
-//We delegate to each sub class literal so we can get each type
-value :
+idliteral :
+ ID<IdLiteral> ;
+
+directnameliteral :
+ ID<IdLiteral> ;
+
+directuuidliteral :
+ UUID<UUIDLiteral>;
+
+//We delegate to each sub class literal so we can get each type
+value :
booleanliteral
| longliteral
| uuidliteral
| stringliteral
| floatliteral
;
-
+directidliteral :
+ directnameliteral
+ | directuuidliteral
+ ;
//Every operand returns with the name of 'op'. This is used because all subtrees require operands,
-//this allows us to link the java code easily by using the same name as a converntion
+//this allows us to link the java code easily by using the same name as a convention
//begin search expressions
-
-//mathmatical equality operations
+
+//mathematical equality operations
equalityop :
property LT<LessThan>^ value
|property LTE<LessThanEqual>^ value
|property EQ<Equal>^ value
|property GT<GreaterThan>^ value
|property GTE<GreaterThanEqual>^ value
- ;
+ ;
//geo location search
locationop :
withinproperty WITHIN<WithinOperand>^ (floatliteral|longliteral) OF! (floatliteral|longliteral) ','! (floatliteral|longliteral);
-
+
//string search
containsop :
containsproperty CONTAINS<ContainsOperand>^ stringliteral;
+directidlist :
+ directidliteral (','! directidliteral)*;
+
//
operation :
- '('! expression ')'!
- | equalityop
- | locationop
- | containsop
+ '('! orexp ')'!
+ | equalityop
+ | locationop
+ | containsop
;
//negations of expressions
notexp :
//only link if we have the not
- NOT<NotOperand>^ operation
- |operation
+ NOT<NotOperand>^ operation
+ |operation
;
//and expressions contain operands. These should always be closer to the leaves of a tree, it allows
//for faster result intersection sooner in the query execution
andexp :
notexp (AND<AndOperand>^ notexp )*;
-
-
+
+
//or expression should always be after AND expressions. This will give us a smaller result set to union when evaluating trees
//also a root level expression
-expression :
+orexp :
andexp (OR<OrOperand>^ andexp )*;
+directexp :
+ DIRECT<DirectOperand>^ directidlist ;
+
+
+// can use DIRECT {UUID|ID}+ at the root
+expression :
+ directexp
+ | orexp
+ ;
+
+
//end expressions
@@ -300,14 +336,14 @@
//order clause
order
: (property direction?){
- String property = $property.text;
+ String property = $property.text;
String direction = $direction.text;
parsedQuery.addSort(new SortPredicate(property, direction));
-
+
};
//end order clauses
-
+
//Begin select clauses
select_subject
@@ -317,7 +353,7 @@
};
-
+
select_assign
: target=ID ':' source=ID {
@@ -326,9 +362,9 @@
};
-select_expr
- : ('*' | select_subject (',' select_subject) * | '{' select_assign (',' select_assign) * '}');
-
+select_expr
+ : ('*' | select_subject (',' select_subject) * | '{' select_assign (',' select_assign) * '}');
+
//end select clauses
ql returns [ParsedQuery parsedQuery]
@@ -337,7 +373,7 @@
if($expression.tree instanceof Operand){
parsedQuery.setRootOperand((Operand)$expression.tree);
}
-
+
retval.parsedQuery = parsedQuery;
diff --git a/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens b/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
index 53a0817..a9e7106 100644
--- a/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
+++ b/stack/corepersistence/queryindex/src/main/java/CpQueryFilter.tokens
@@ -1,4 +1,3 @@
-T__31=31
T__32=32
T__33=33
T__34=34
@@ -8,40 +7,42 @@
T__38=38
T__39=39
T__40=40
+T__41=41
AND=4
ASC=5
BOOLEAN=6
CONTAINS=7
DESC=8
-EQ=9
-ESC_SEQ=10
-EXPONENT=11
-FALSE=12
-FLOAT=13
-GT=14
-GTE=15
-HEX_DIGIT=16
-ID=17
-LONG=18
-LT=19
-LTE=20
-NOT=21
-OCTAL_ESC=22
-OF=23
-OR=24
-STRING=25
-TRUE=26
-UNICODE_ESC=27
-UUID=28
-WITHIN=29
-WS=30
-'('=31
-')'=32
-'*'=33
-','=34
-':'=35
-'order by'=36
-'select'=37
-'where'=38
-'{'=39
-'}'=40
+DIRECT=9
+EQ=10
+ESC_SEQ=11
+EXPONENT=12
+FALSE=13
+FLOAT=14
+GT=15
+GTE=16
+HEX_DIGIT=17
+ID=18
+LONG=19
+LT=20
+LTE=21
+NOT=22
+OCTAL_ESC=23
+OF=24
+OR=25
+STRING=26
+TRUE=27
+UNICODE_ESC=28
+UUID=29
+WITHIN=30
+WS=31
+'('=32
+')'=33
+'*'=34
+','=35
+':'=36
+'order by'=37
+'select'=38
+'where'=39
+'{'=40
+'}'=41
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
index 9b1898c..65c20f0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java
@@ -30,14 +30,59 @@
* An instance of a candidate result
*/
public class CandidateResult implements EntityVersion {
- private final Id entityId;
+ private Id entityId;
private final UUID entityVersion;
private final String docId;
+ private final String directEntityName;
+ private final UUID directEntityUUID;
+ private final String directEntityType;
public CandidateResult( Id entityId, UUID entityVersion, String docId ) {
this.entityId = entityId;
this.entityVersion = entityVersion;
this.docId = docId;
+ this.directEntityName = null;
+ this.directEntityUUID = null;
+ this.directEntityType = null;
+ }
+
+ public CandidateResult( Id entityId, CandidateResult sourceResult ) {
+ this.entityId = entityId;
+ this.entityVersion = sourceResult.entityVersion;
+ this.docId = sourceResult.docId;
+ this.directEntityName = sourceResult.directEntityName;
+ this.directEntityUUID = sourceResult.directEntityUUID;
+ this.directEntityType = sourceResult.directEntityType;
+ }
+
+ // direct query by name before resolution
+ public CandidateResult( String entityType, String directEntityName ) {
+ this.directEntityName = directEntityName;
+ this.directEntityUUID = null;
+ this.directEntityType = entityType;
+ this.entityId = null;
+ this.entityVersion = null;
+ this.docId = null;
+ }
+
+ // direct query by UUID before resolution
+ public CandidateResult( String entityType, UUID directEntityUUID ) {
+ this.directEntityUUID = directEntityUUID;
+ this.directEntityName = null;
+ this.directEntityType = entityType;
+ this.entityId = null;
+ this.entityVersion = null;
+ this.docId = null;
+ }
+
+ public boolean isDirectQueryName() {
+ return directEntityName != null;
+ }
+ public boolean isDirectQueryUUID() {
+ return directEntityUUID != null;
+ }
+ public boolean isDirectQuery() {
+ return isDirectQueryName() || isDirectQueryUUID();
}
@Override
@@ -54,25 +99,69 @@
return docId;
}
+ public String getDirectEntityName() {
+ return directEntityName;
+ }
+
+ public UUID getDirectEntityUUID() {
+ return directEntityUUID;
+ }
+
+ public String getDirectEntityType() {
+ return directEntityType;
+ }
+
+ // use to set id for direct query after resolution
+ public void setId(Id entityId) {
+ this.entityId = entityId;
+ }
+
@Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
}
+ if ( o == null ) {
+ return false;
+ }
if ( !( o instanceof CandidateResult ) ) {
return false;
}
final CandidateResult that = ( CandidateResult ) o;
- if ( !entityId.equals( that.entityId ) ) {
+ if ( entityId == null && that.entityId != null) {
return false;
}
- if ( !entityVersion.equals( that.entityVersion ) ) {
+ if ( entityId != null && !entityId.equals( that.entityId ) ) {
return false;
}
- if ( !docId.equals( that.docId ) ) {
+ if ( entityVersion == null && that.entityVersion != null) {
+ return false;
+ }
+ if ( entityVersion != null && !entityVersion.equals( that.entityVersion ) ) {
+ return false;
+ }
+ if ( docId == null && that.docId != null) {
+ return false;
+ }
+ if ( docId != null && !docId.equals( that.docId ) ) {
+ return false;
+ }
+ if ( directEntityUUID != that.directEntityUUID ) {
+ return false;
+ }
+ if ( directEntityName == null && that.directEntityName != null) {
+ return false;
+ }
+ if ( directEntityName != null && !directEntityName.equals( that.directEntityName ) ) {
+ return false;
+ }
+ if ( directEntityType == null && that.directEntityType != null) {
+ return false;
+ }
+ if ( directEntityType != null && !directEntityType.equals( that.directEntityType ) ) {
return false;
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
index b8f87b6..3a69706 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResults.java
@@ -38,11 +38,19 @@
private final List<CandidateResult> candidates;
private final Collection<SelectFieldMapping> getFieldMappings;
+ private final boolean isDirectQuery;
public CandidateResults( List<CandidateResult> candidates, final Collection<SelectFieldMapping> getFieldMappings) {
+ this(candidates, getFieldMappings, false);
+ }
+
+ public CandidateResults(List<CandidateResult> candidates,
+ final Collection<SelectFieldMapping> getFieldMappings,
+ final boolean isDirectQuery) {
this.candidates = candidates;
this.getFieldMappings = getFieldMappings;
offset = Optional.absent();
+ this.isDirectQuery = isDirectQuery;
}
@@ -82,6 +90,10 @@
return getFieldMappings;
}
+ public boolean isDirectQuery() {
+ return this.isDirectQuery;
+ }
+
/**
* Get the candidates
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index afcfdd7..ebe6824 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -22,6 +22,7 @@
import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -140,6 +141,25 @@
final int limit, final int offset, final Map<String, Class> fieldsWithType,
final boolean analyzeOnly, final boolean returnQuery);
+ /**
+ * Search on every document in the specified search edge. Also search by the types if specified
+ *
+ * @param searchEdge The edge to search on
+ * @param searchTypes The search types to search
+ * @param parsedQuery The parsed query to execute
+ * @param limit The limit of values to return
+ * @param offset The offset to query on
+ * @param fieldsWithType An optional param that allows the caller to provide schema related info which might
+ * relate to data in the query, such as sort predicate types
+ * @param analyzeOnly This optional param will instruct the query processing to only analyze the query and
+ * provide info but not actually execute the query.
+ * @param returnQuery This optional param will cause the index query to be returned instead of run.
+ * @return
+ */
+ CandidateResults search(final SearchEdge searchEdge, final SearchTypes searchTypes, final ParsedQuery parsedQuery,
+ final int limit, final int offset, final Map<String, Class> fieldsWithType,
+ final boolean analyzeOnly, final boolean returnQuery);
+
/**
* Same as search, just iterates all documents that match the index edge exactly.
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9b42da3..493a722 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -72,6 +72,8 @@
String USERGRID_QUERYANALYZER_ENFORCE = "usergrid.queryanalyzer.enforce";
+ String DIRECT_QUERY_MAX_ITEMS = "direct.query.max.items";
+
@@ -238,4 +240,8 @@
@Default("false")
@Key( USERGRID_QUERYANALYZER_ENFORCE )
boolean enforceQueryBreaker();
+
+ @Default("1000")
+ @Key( DIRECT_QUERY_MAX_ITEMS )
+ int directQueryMaxItems();
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java
new file mode 100644
index 0000000..b3f2052
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/exceptions/TooManyDirectEntitiesException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.exceptions;
+
+
+/**
+ * Thrown when the user attempts to perform a "direct" operation with more than the max limit number of entities
+ */
+public class TooManyDirectEntitiesException extends QueryException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ final private int numberItemsRequested;
+ final private int maxItemsAllowed;
+
+
+ public TooManyDirectEntitiesException(int numberItemsRequested, int maxItemsAllowed) {
+ super( "Exceeded maximum number of direct entities requested: "
+ + Integer.toString(numberItemsRequested) + " requested, limit is " + Integer.toString(maxItemsAllowed));
+ this.numberItemsRequested = numberItemsRequested;
+ this.maxItemsAllowed = maxItemsAllowed;
+ }
+
+
+ public int getNumberItemsRequested() {
+ return numberItemsRequested;
+ }
+
+
+ public int getMaxNumberItems() {
+ return maxItemsAllowed;
+ }
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index e3121e1..82be6ec 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -36,11 +36,9 @@
import org.apache.usergrid.persistence.core.util.StringUtils;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
-import org.apache.usergrid.persistence.index.exceptions.IndexException;
-import org.apache.usergrid.persistence.index.exceptions.QueryAnalyzerException;
-import org.apache.usergrid.persistence.index.exceptions.QueryAnalyzerEnforcementException;
-import org.apache.usergrid.persistence.index.exceptions.QueryReturnException;
+import org.apache.usergrid.persistence.index.exceptions.*;
import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
+import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.ParsedQuery;
import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
import org.apache.usergrid.persistence.index.query.SortPredicate;
@@ -437,23 +435,39 @@
public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
final int limit, final int offset, final Map<String, Class> fieldsWithType,
final boolean analyzeOnly, final boolean returnQuery ) {
+ Preconditions.checkNotNull( query, "query cannot be null" );
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
+
+ return search(searchEdge, searchTypes, parsedQuery, limit, offset, fieldsWithType, analyzeOnly, returnQuery);
+ }
+
+ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final ParsedQuery parsedQuery,
+ final int limit, final int offset, final Map<String, Class> fieldsWithType,
+ final boolean analyzeOnly, final boolean returnQuery ) {
IndexValidationUtils.validateSearchEdge(searchEdge);
Preconditions.checkNotNull(searchTypes, "searchTypes cannot be null");
- Preconditions.checkNotNull( query, "query cannot be null" );
Preconditions.checkArgument( limit > 0, "limit must be > 0" );
SearchResponse searchResponse;
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
-
if ( parsedQuery == null ){
throw new IllegalArgumentException("a null query string cannot be parsed");
}
+ if (parsedQuery.isDirectQuery() && parsedQuery.getDirectQueryItemCount() > indexFig.directQueryMaxItems()) {
+ throw new TooManyDirectEntitiesException(parsedQuery.getDirectQueryItemCount(), indexFig.directQueryMaxItems());
+ }
+
final QueryVisitor visitor = visitParsedQuery(parsedQuery);
+ List<Identifier> directIdentifiers = visitor.getDirectIdentifiers();
+ if (directIdentifiers != null && directIdentifiers.size() > 0) {
+ // this is a direct query
+ return buildCandidateResultsForDirectQuery(directIdentifiers, parsedQuery, searchTypes);
+ }
+
boolean hasGeoSortPredicates = false;
for (SortPredicate sortPredicate : parsedQuery.getSortPredicates() ){
@@ -644,7 +658,7 @@
/**
- * Parse the results and return the canddiate results
+ * Parse the results and return the candidate results
*/
private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
final int limit, final int from, boolean hasGeoSortPredicates ) {
@@ -677,6 +691,33 @@
return candidateResults;
}
+
+ /**
+ * Build CandidateResults from direct query
+ */
+ private CandidateResults buildCandidateResultsForDirectQuery(final List<Identifier> directIdentifiers,
+ final ParsedQuery query,
+ final SearchTypes searchTypes) {
+ Preconditions.checkArgument(searchTypes.getTypes().length > 0, "Search type required");
+ String entityType = searchTypes.getTypes()[0];
+
+ List<CandidateResult> candidates = new ArrayList<>(directIdentifiers.size());
+
+ for (Identifier id : directIdentifiers) {
+ CandidateResult candidateResult = null;
+ if (id.isUUID()) {
+ candidateResult = new CandidateResult(entityType, id.getUUID());
+ } else if (id.isName()) {
+ candidateResult = new CandidateResult(entityType, id.getName());
+ }
+ candidates.add(candidateResult);
+ }
+
+ return new CandidateResults(candidates, query.getSelectFieldMappings(), true);
+ }
+
+
+
private List<CandidateResult> aggregateScrollResults(List<CandidateResult> candidates,
final SearchResponse searchResponse, final UUID markedVersion){
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
index 4dd0d24..7f08ee3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
@@ -19,9 +19,10 @@
package org.apache.usergrid.persistence.index.impl;
-import java.util.Stack;
-import java.util.UUID;
+import java.util.*;
+import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.index.query.tree.*;
import org.elasticsearch.common.geo.GeoDistance;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.BoolFilterBuilder;
@@ -44,17 +45,6 @@
import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
-import org.apache.usergrid.persistence.index.query.tree.AndOperand;
-import org.apache.usergrid.persistence.index.query.tree.ContainsOperand;
-import org.apache.usergrid.persistence.index.query.tree.Equal;
-import org.apache.usergrid.persistence.index.query.tree.GreaterThan;
-import org.apache.usergrid.persistence.index.query.tree.GreaterThanEqual;
-import org.apache.usergrid.persistence.index.query.tree.LessThan;
-import org.apache.usergrid.persistence.index.query.tree.LessThanEqual;
-import org.apache.usergrid.persistence.index.query.tree.NotOperand;
-import org.apache.usergrid.persistence.index.query.tree.OrOperand;
-import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
-import org.apache.usergrid.persistence.index.query.tree.WithinOperand;
import com.google.common.base.Optional;
@@ -79,6 +69,13 @@
private final GeoSortFields geoSortFields = new GeoSortFields();
+ /**
+ * Query direct to C* bypassing ES
+ */
+ private final List<Identifier> directIdList = new ArrayList<>();
+
+
+
@Override
public void visit( AndOperand op ) throws IndexException {
@@ -323,6 +320,35 @@
@Override
+ public void visit( DirectOperand op ) {
+ List<Literal> idList = op.getDirectIds();
+
+ Set<Identifier> idSet = new HashSet<>();
+
+ for (Literal literal : idList) {
+
+ Identifier identifier = null;
+ if (literal instanceof IdLiteral) {
+ String name = ((IdLiteral)literal).getValue();
+ identifier = Identifier.fromName(name);
+ } else if (literal instanceof UUIDLiteral) {
+ UUID uuid = ((UUIDLiteral)literal).getValue();
+ identifier = Identifier.fromUUID(uuid);
+ }
+ // should only allow IdLiteral or UUIDLiteral, ignore if other
+
+ if (identifier != null) {
+ // ignore if already seen
+ if (!idSet.contains(identifier)) {
+ directIdList.add(identifier);
+ idSet.add(identifier);
+ }
+ }
+ }
+ }
+
+
+ @Override
public void visit( LessThan op ) throws NoIndexException {
final String name = op.getProperty().getValue().toLowerCase();
final Object value = op.getLiteral().getValue();
@@ -472,6 +498,12 @@
}
+ @Override
+ public List<Identifier> getDirectIdentifiers() {
+ return directIdList;
+ }
+
+
/**
* Generate the field name term for the field name for queries
*/
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
index 1cb3ba2..4c0da5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/ParsedQuery.java
@@ -34,6 +34,7 @@
import org.apache.usergrid.persistence.index.SelectFieldMapping;
import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
+import org.apache.usergrid.persistence.index.query.tree.DirectOperand;
import org.apache.usergrid.persistence.index.query.tree.Operand;
@@ -206,4 +207,17 @@
public boolean isGeoQuery(){
return getOriginalQuery().contains("location") && getOriginalQuery().contains("within");
}
+
+ public boolean isDirectQuery() {
+ return rootOperand instanceof DirectOperand;
+ }
+
+ public int getDirectQueryItemCount() {
+ int count = 0;
+ if (rootOperand instanceof DirectOperand) {
+ DirectOperand root = (DirectOperand)rootOperand;
+ count = root.getChildCount();
+ }
+ return count;
+ }
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java
new file mode 100644
index 0000000..796dc5e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/DirectOperand.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.query.tree;
+
+
+import org.antlr.runtime.Token;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+
+public class DirectOperand extends Operand {
+
+ public DirectOperand(Token t ) {
+ super( t );
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.persistence.query.tree.Operand#visit(org.apache.usergrid.persistence
+ * .query.tree.QueryVisitor)
+ */
+ @Override
+ public void visit( QueryVisitor visitor ) {
+ visitor.visit( this );
+ }
+
+
+ /**
+ * @param name
+ * @param index
+ */
+ public void setDirectId( String name, int index ) {
+ setChild( index, new IdLiteral( name ) );
+ }
+
+
+ /**
+ * @param uuid
+ * @param index
+ */
+ public void setDirectId(UUID uuid, int index ) {
+ setChild( index, new UUIDLiteral( uuid ) );
+ }
+
+ /**
+ * @param index
+ * @return
+ */
+ public Literal getDirectId( int index ) {
+ return ( Literal ) this.children.get( index );
+ }
+
+ /**
+ * @return
+ */
+ public List<Literal> getDirectIds() {
+ List<Literal> ids = new ArrayList<>();
+ for (Object child : this.children) {
+ ids.add((Literal) child);
+ }
+ return ids;
+ }
+
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java
new file mode 100644
index 0000000..1defdd3
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/IdLiteral.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+
+package org.apache.usergrid.persistence.index.query.tree;
+
+
+import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.Token;
+
+
+public class IdLiteral extends Literal<String> {
+
+ private String value;
+
+
+ public IdLiteral(Token t ) {
+ super( t );
+ value = t.getText();
+ }
+
+
+ public IdLiteral(String value ) {
+ super( new ClassicToken( 0, value ) );
+ this.value = value;
+ }
+
+
+ public String getValue() {
+ return this.value;
+ }
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
index 273f23f..6c191cf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/tree/QueryVisitor.java
@@ -19,19 +19,16 @@
package org.apache.usergrid.persistence.index.query.tree;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
+import java.util.List;
import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.impl.GeoSortFields;
-import org.apache.usergrid.persistence.index.query.SortPredicate;
+import org.apache.usergrid.persistence.index.query.Identifier;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import com.google.common.base.Optional;
@@ -103,6 +100,12 @@
*/
void visit( GreaterThanEqual op ) throws NoIndexException;
+ /**
+ * @param op
+ * @throws NoIndexException
+ */
+ void visit( DirectOperand op );
+
/**
* Return any filters created during parsing
@@ -126,4 +129,9 @@
* @return The GeoSortFields null safe
*/
GeoSortFields getGeoSorts();
+
+ /**
+ * Return list of identifiers for direct query
+ */
+ List<Identifier> getDirectIdentifiers();
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
index bf8dd3c..1ba9f3c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/UUIDUtils.java
@@ -303,8 +303,11 @@
if ( uuid == null ) {
return 0;
}
- long t = uuid.timestamp();
- return ( t - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L;
+ return getUnixTimestampInMillisFromUUIDTimestamp(uuid.timestamp());
+ }
+
+ public static long getUnixTimestampInMillisFromUUIDTimestamp( long uuidTimestamp ) {
+ return ( uuidTimestamp - KCLOCK_OFFSET ) / KCLOCK_MULTIPLIER_L;
}
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
index 380057f..c459e43 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/query/tree/GrammarTreeTest.java
@@ -681,9 +681,9 @@
}
assertEquals(
- "NoViableAltException('!'@[1:1: Tokens : ( T__31 | T__32 | T__33 | T__34 | T__35 | T__36 | T__37 | "
- + "T__38 | T__39 | T__40 | LT | LTE | EQ | GT | GTE | BOOLEAN | AND | OR | NOT | ASC | DESC |"
- + " CONTAINS | WITHIN | OF | UUID | ID | LONG | FLOAT | STRING | WS );])",
+ "NoViableAltException('!'@[1:1: Tokens : ( T__32 | T__33 | T__34 | T__35 | T__36 | T__37 | T__38 | T__39 | T__40 |"
+ +" T__41 | LT | LTE | EQ | GT | GTE | BOOLEAN | AND | OR | NOT | ASC | DESC | CONTAINS | WITHIN | OF | DIRECT |"
+ +" UUID | ID | LONG | FLOAT | STRING | WS );])",
error );
}
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index de1671d..83b65de 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -682,7 +682,7 @@
@Override
public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
- boolean sendAsync = async.booleanValue();
+ boolean sendAsync = async == null || async.booleanValue();
if (sendAsync) {
sendMessageToLocalRegionAsync(body);
} else {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
index 531716a..fdbd399 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
@@ -51,7 +51,7 @@
public static IndexConsistency get(String name) {
IndexConsistency queueIndexingStrategy = NAME_MAP.get(name);
if (queueIndexingStrategy == null) {
- return LATEST;
+ return STRICT;
}
return queueIndexingStrategy;
}
diff --git a/stack/pom.xml b/stack/pom.xml
index c98c72d..438a6fd 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -114,10 +114,10 @@
<junit-version>4.12</junit-version>
<log4j-version>1.2.16</log4j-version>
<org.springframework.version>3.2.13.RELEASE</org.springframework.version>
- <shiro-version>1.2.4</shiro-version>
+ <shiro-version>1.3.2</shiro-version>
<slf4j-version>1.7.2</slf4j-version>
<snakeyaml-version>1.9</snakeyaml-version>
- <tomcat-version>7.0.64</tomcat-version>
+ <tomcat-version>7.0.84</tomcat-version>
<antlr.version>3.4</antlr.version>
<tika.version>1.4</tika.version>
<mockito.version>1.10.8</mockito.version>
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
index 727d187..684e8e1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ApiResponse.java
@@ -613,9 +613,16 @@
public void setParams( Map<String, List<String>> params ) {
+ setParams(params, null);
+ }
+
+
+ // provide an ignore list to block custom params
+ public void setParams( Map<String, List<String>> params, List<String> ignoreList ) {
Map<String, List<String>> q = new LinkedHashMap<>();
for ( String k : params.keySet() ) {
if (IGNORE_QP.contains(k.toLowerCase())) continue;
+ if (ignoreList != null && ignoreList.contains(k.toLowerCase())) continue;
List<String> v = params.get( k );
if ( v != null ) {
q.put( k, new ArrayList<>( v ) );
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index b8abe54..6a42826 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -134,9 +134,10 @@
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse getAllApplications2( @Context UriInfo ui,
- @QueryParam("callback") @DefaultValue("callback") String callback )
+ @QueryParam("deleted") @DefaultValue("false") Boolean deleted,
+ @QueryParam("callback") @DefaultValue("callback") String callback )
throws URISyntaxException {
- return getAllApplications( ui, false, callback );
+ return getAllApplications( ui, deleted, callback );
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index 449eac9..8cd5e9e 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -287,26 +287,41 @@
}
- // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
- // So system access only.
- // TODO: use scheduler here to get around people sending a reindex call 30 times.
+
@POST
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
- @RequireSystemAccess
+ @RequireApplicationAccess
@JSONP
public ApiResponse executePostForReindexing(
- @Context UriInfo ui, String body,
+ @Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
addItemToServiceContext( ui, itemName );
IndexResource indexResource = new IndexResource(injector);
- return indexResource.rebuildIndexesPost(
+ return indexResource.rebuildIndexCollectionPost(payload,
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}
+ @GET
+ @Path("{itemName}/_reindex")
+ @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse executeGetForReindexStatus(
+ @Context UriInfo ui, final Map<String, Object> payload,
+ @PathParam("itemName") PathSegment itemName,
+ @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+ addItemToServiceContext( ui, itemName );
+
+ IndexResource indexResource = new IndexResource(injector);
+ return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
+ callback );
+ }
+
private CollectionDeleteService getCollectionDeleteService() {
return injector.getInstance( CollectionDeleteService.class );
@@ -346,18 +361,17 @@
private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
- final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+ final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request);
final ApiResponse response = createApiResponse();
- response.setAction( "clear collection" );
- response.setProperty( "jobId", status.getJobId() );
- response.setProperty( "status", status.getStatus() );
- response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
- response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setAction("clear collection");
+ response.setProperty("jobId", status.getJobId());
+ response.setProperty("status", status.getStatus());
+ response.setProperty("lastUpdatedEpoch", status.getLastUpdated());
+ response.setProperty("numberQueued", status.getNumberProcessed());
response.setSuccess();
return response;
}
-
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java
new file mode 100644
index 0000000..06226a5
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/TooManyDirectEntitiesExceptionMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.usergrid.rest.exceptions;
+
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.Provider;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+import org.apache.usergrid.persistence.index.exceptions.TooManyDirectEntitiesException;
+
+@Provider
+public class TooManyDirectEntitiesExceptionMapper extends AbstractExceptionMapper<TooManyDirectEntitiesException> {
+ @Override
+ public Response toResponse( final TooManyDirectEntitiesException e ) {
+ return toResponse( BAD_REQUEST, e );
+ }
+}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
index 79973c3..0eff6b7 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/organizations/applications/ApplicationResource.java
@@ -23,6 +23,7 @@
import org.apache.amber.oauth2.common.message.OAuthResponse;
import org.apache.commons.lang.NullArgumentException;
import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.corepersistence.service.ApplicationRestorePasswordService;
import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.export.ExportService;
@@ -44,6 +45,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
+import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
@@ -73,6 +75,7 @@
private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class);
public static final String CONFIRM_APPLICATION_IDENTIFIER = "confirm_application_identifier";
+ public static final String RESTORE_PASSWORD = "restore_password";
//@Autowired
//protected ExportService exportService;
@@ -465,8 +468,23 @@
throw new IllegalArgumentException("Application ID not specified in request");
}
+ ApplicationRestorePasswordService restorePasswordService = getApplicationRestorePasswordService();
+ if (!SubjectUtils.isServiceAdmin()) {
+ // require password if it exists
+ String storedRestorePassword = restorePasswordService.getApplicationRestorePassword(applicationId);
+ if (StringUtils.isNotEmpty(storedRestorePassword)) {
+ // must have matching password as query parameter
+ String suppliedRestorePassword = ui.getQueryParameters().getFirst(RESTORE_PASSWORD);
+ if (!storedRestorePassword.equals(suppliedRestorePassword)) {
+ throw new IllegalArgumentException("Application cannot be restored without application password");
+ }
+ }
+ }
+
management.restoreApplication( applicationId );
+ // not deleting password -- will be changed upon successful soft delete
+
ApiResponse response = createApiResponse();
response.setAction( "restore" );
response.setApplication( emf.getEntityManager( applicationId ).getApplication() );
@@ -505,8 +523,23 @@
"Cannot delete application without supplying correct application name");
}
+ String restorePassword = null;
+ ApplicationRestorePasswordService restorePasswordService = getApplicationRestorePasswordService();
+ if (SubjectUtils.isServiceAdmin()) {
+ restorePassword = ui.getQueryParameters().getFirst(RESTORE_PASSWORD);
+ if (StringUtils.isNotEmpty(restorePassword)) {
+ // save password, required for future undelete if not sysadmin
+ restorePasswordService.setApplicationRestorePassword(applicationId, restorePassword);
+ }
+ }
+
management.deleteApplication( applicationId );
+ if (restorePassword == null) {
+ // clear restore password
+ restorePasswordService.removeApplicationRestorePassword(applicationId);
+ }
+
if (logger.isTraceEnabled()) {
logger.trace("ApplicationResource.delete() deleted appId = {}", applicationId);
}
@@ -523,4 +556,8 @@
return response;
}
+ private ApplicationRestorePasswordService getApplicationRestorePasswordService() {
+ return injector.getInstance(ApplicationRestorePasswordService.class);
+ }
+
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..ec86750 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -28,13 +28,16 @@
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.exception.ConflictException;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.index.utils.ConversionUtils;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.RootResource;
+import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+import org.apache.usergrid.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
@@ -182,26 +185,78 @@
return executeResumeAndCreateResponse( payload, request, callback );
}
+ @RequireOrganizationAccess
+ @GET
+ @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+ @JSONP
+ @Produces({ MediaType.APPLICATION_JSON, "application/javascript" })
+ public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
- @RequireSystemAccess
+
+ throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName);
+ }
+
+
+ ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "get rebuild index status" );
+ response.setProperty( "collection", status.getCollectionName() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
+ @RequireOrganizationAccess
@POST
@Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
- public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
- @PathParam( "collectionName" ) final String collectionName,
- @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ public ApiResponse rebuildIndexCollectionPost(final Map<String, Object> payload,
+ @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
throws Exception {
+ ReIndexService.ReIndexStatus existingStatus =
+ getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
- logger.info( "Rebuilding collection {} in application {}", collectionName, applicationIdStr );
+ if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){
+ throw new ConflictException("Re-index for collection currently in progress");
+ }
+
+ logger.info( "Re-indexing collection {} in application {}", collectionName, applicationIdStr );
final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
+
final ReIndexRequestBuilder request =
createRequest().withApplicationId( appId ).withCollection( collectionName );
+ Map<String,Object> newPayload = payload;
+ if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){
+ newPayload = new HashMap<>(1);
+ newPayload.put(UPDATED_FIELD,0);
+ }
+
+ Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
+ "Property \"updated\" in the payload must be a number in unix timestamp millis format" );
+
+ //add our updated timestamp to the request
+ if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+ final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
+ request.withStartTimestamp( timestamp );
+ }
+
return executeAndCreateResponse( request, callback );
}
@@ -214,7 +269,6 @@
public ApiResponse rebuildIndexesPut( final Map<String, Object> payload,
@PathParam( "applicationId" ) final String applicationIdStr,
@PathParam( "collectionName" ) final String collectionName,
- @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
@QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
throws Exception {
@@ -350,7 +404,15 @@
final ApiResponse response = createApiResponse();
response.setAction( "rebuild indexes" );
- response.setProperty( "jobId", status.getJobId() );
+
+ if(StringUtils.isNotEmpty(status.getJobId())){
+ response.setProperty( "jobId", status.getJobId() );
+ }
+
+ if(StringUtils.isNotEmpty(status.getCollectionName())){
+ response.setProperty( "collection", status.getCollectionName() );
+ }
+
response.setProperty( "status", status.getStatus() );
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
response.setProperty( "numberQueued", status.getNumberProcessed() );
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
index 28d6501..be71881 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/IndexResourceIT.java
@@ -25,6 +25,7 @@
import org.apache.usergrid.rest.test.resource.model.ApiResponse;
import org.apache.usergrid.rest.test.resource.model.QueryParameters;
import org.apache.usergrid.rest.test.resource.model.Token;
+import org.apache.usergrid.corepersistence.index.ReIndexService.Status;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.junit.Ignore;
import org.junit.Test;
@@ -68,35 +69,51 @@
.get(clientSetup.getSuperuserName(),clientSetup.getSuperuserPassword());
QueryParameters queryParameters = new QueryParameters();
- queryParameters.addParam( "access_token",superUserToken.getAccessToken());
+ queryParameters.addParam("access_token", superUserToken.getAccessToken());
ApiResponse result = clientSetup.getRestClient()
- .pathResource( "system/index/rebuild/" + clientSetup.getAppUuid() + "/StOrElaTloNs" )
- .post( false, ApiResponse.class, null, queryParameters, true );
+ .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/StOrElaTloNs")
+ .post(false, ApiResponse.class, null, queryParameters, true);
assertNotNull(result);
+ assertEquals(Status.STARTED.name(), result.getStatus());
//try the reindex endpoint with all lowercase Characters
queryParameters = new QueryParameters();
- queryParameters.addParam( "access_token",clientSetup.getSuperuserToken().getAccessToken() );
+ queryParameters.addParam("access_token", superUserToken.getAccessToken());
result = clientSetup.getRestClient()
- .pathResource( "system/index/rebuild/"+clientSetup.getAppUuid()+"/storelatlons" )
- .post( false, ApiResponse.class,null,queryParameters,true);
- String status = result.getProperties().get("jobId").toString();
-
- assertNotNull( result );
+ .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/storelatlons")
+ .post(false, ApiResponse.class, null, queryParameters, true);
+
+ assertNotNull(result);
+ //at this point, this could return a result of the previous reindex, or if it has completed, it will create a new job
+ assertNotEquals(Status.UNKNOWN.name(), result.getStatus());
WebTarget res = clientSetup.getRestClient()
- .pathResource( "system/index/rebuild/" + result.getProperties()
- .get( "jobId" ).toString() )
+ .pathResource("system/index/rebuild/" + clientSetup.getAppUuid() + "/storelatlons")
.getTarget();
HttpAuthenticationFeature feature = HttpAuthenticationFeature.basicBuilder()
.credentials( "superuser", "superpassword" ).build();
result = res.register( feature ).request().get( ApiResponse.class );
-
- assertNotNull( result );
- assertEquals(status,result.getProperties().get("jobId").toString());
+ assertNotNull(result);
+
+ int retry = 0;
+ while(retry < 5 && !result.getStatus().equals(Status.COMPLETE.name())) {
+ try {
+ //hope reindex completes, if not, that's still ok
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+ result = res.register( feature ).request().get( ApiResponse.class );
+ retry++;
+ assertNotNull(result);
+ }
+
+ Map<String, Object> resultMap = result.getProperties();
+ assertNotNull( resultMap );
+ assertEquals(1,resultMap.get("numberQueued"));
}
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
index d49460c..61e2760 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/queries/AndOrQueryTest.java
@@ -24,6 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import net.jcip.annotations.NotThreadSafe;
+
import java.io.IOException;
import java.util.List;
@@ -35,6 +37,7 @@
*
* @since 4.0
*/
+@NotThreadSafe
public class AndOrQueryTest extends QueryTestBase {
private static final Logger logger = LoggerFactory.getLogger(AndOrQueryTest.class);
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
index 3d0c9fb..e0ab95e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
@@ -110,6 +110,8 @@
UUID addApplicationToOrganization(UUID organizationId, Entity appInfo) throws Exception;
+ boolean deleteAdminUser( UUID userId ) throws Exception;
+
void deleteOrganizationApplication( UUID organizationId, UUID applicationId ) throws Exception;
void disableAdminUser( UUID userId ) throws Exception;
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index ab93563..7e114e5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -934,6 +934,36 @@
}
+ // reverse doCreateAdmin + creation of user entity
+ @Override
+ public boolean deleteAdminUser( UUID userId ) throws Exception {
+
+ // make sure user is not attached to any orgs
+ BiMap<UUID, String> orgMap = getOrganizationsForAdminUser(userId);
+ if (!orgMap.isEmpty()) {
+ // cannot delete admin user that is attached to orgs
+ logger.info("Cannot delete admin user {} -- admin user is attached to {} orgs", userId, orgMap.size());
+ return false;
+ }
+
+ EntityRef userRef = new SimpleEntityRef(User.ENTITY_TYPE, userId);
+
+ // delete mongo password
+ deleteUserMongoPassword(smf.getManagementAppId(), userRef);
+
+ // delete user password
+ deleteUserPassword(smf.getManagementAppId(), userRef);
+
+ // delete user token
+ deleteUserToken(smf.getManagementAppId(), userRef);
+
+ // delete user entity
+ emf.getEntityManager(smf.getManagementAppId()).delete(userRef);
+
+ return true;
+ }
+
+
@Override
public UserInfo createAdminFromPrexistingPassword( UUID organizationId, User user, CredentialsInfo ci )
throws Exception {
@@ -3283,6 +3313,12 @@
}
+ /** Delete the user's password credentials info */
+ protected void deleteUserPassword( UUID appId, EntityRef owner ) throws Exception {
+ deleteCreds( appId, owner, USER_PASSWORD );
+ }
+
+
/** read the user password credential's info */
protected CredentialsInfo readUserPasswordCredentials( UUID appId, UUID ownerId, String ownerType )
throws Exception {
@@ -3302,11 +3338,22 @@
}
+ /** Delete the user's token */
+ protected void deleteUserToken( UUID appId, EntityRef owner ) throws Exception {
+ deleteCreds( appId, owner, USER_TOKEN );
+ }
+
+
/** Write the mongo password */
protected void writeUserMongoPassword( UUID appId, EntityRef owner, CredentialsInfo password ) throws Exception {
writeCreds( appId, owner, password, USER_MONGO_PASSWORD );
}
+ /** Delete the mongo password */
+ private void deleteUserMongoPassword( UUID appId, EntityRef owner) throws Exception {
+ deleteCreds( appId, owner, USER_MONGO_PASSWORD );
+ }
+
/** Read the mongo password */
protected CredentialsInfo readUserMongoPassword( UUID appId, UUID ownerId, String ownerType ) throws Exception {
@@ -3339,6 +3386,12 @@
}
+ private void deleteCreds( UUID appId, EntityRef owner, String key ) throws Exception {
+ EntityManager em = emf.getEntityManager( appId );
+ em.removeFromDictionary(owner, DICTIONARY_CREDENTIALS, key);
+ }
+
+
private Set<CredentialsInfo> readUserPasswordHistory( UUID appId, UUID ownerId ) throws Exception {
EntityManager em = emf.getEntityManager( appId );
Entity owner = em.get( new SimpleEntityRef("user", ownerId ));
diff --git a/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java b/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
index 659520e..892a440 100644
--- a/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
+++ b/stack/services/src/test/java/org/apache/usergrid/NewOrgAppAdminRule.java
@@ -120,6 +120,7 @@
* Create the org admin and application
*/
protected void before( Description description ) throws Exception {
+ logger.info( "Test {}: Starting with application", description.getDisplayName() );
final String className = description.getClassName();
final String methodName = description.getMethodName();
final String uuidString = newUUIDString();
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java b/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
index 6424dbd..d2932c2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/OrganizationConfigIT.java
@@ -32,12 +32,14 @@
import org.junit.Rule;
import org.junit.Test;
+import net.jcip.annotations.NotThreadSafe;
+
import java.util.*;
import static org.apache.usergrid.TestHelper.*;
import static org.junit.Assert.*;
-
+@NotThreadSafe
public class OrganizationConfigIT {
@Rule
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
index 870b678..2268ea6 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/export/ExportServiceIT.java
@@ -59,6 +59,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;
+import net.jcip.annotations.NotThreadSafe;
+
import static org.apache.usergrid.TestHelper.newUUIDString;
import static org.apache.usergrid.TestHelper.uniqueApp;
import static org.apache.usergrid.TestHelper.uniqueOrg;
@@ -74,6 +76,7 @@
*
*
*/
+@NotThreadSafe
public class ExportServiceIT {
private static final Logger logger = LoggerFactory.getLogger( ExportServiceIT.class );
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index b34b068..991ff7c 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -262,6 +262,12 @@
<version>${aws.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.8.1</version>
+ </dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
new file mode 100644
index 0000000..911e43f
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.BiMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.tools.export.ExportEntity;
+import org.apache.usergrid.utils.StringUtils;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.schedulers.Schedulers;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Delete all entities and connections of a Usergrid app.
+ */
+public class AppDeleter extends ExportingToolBase {
+ static final Logger logger = LoggerFactory.getLogger( AppDeleter.class );
+
+ private static final String ORGANIZATION_NAME = "organizationName";
+ private static final String APPLICATION_NAME = "applicationName";
+ private static final String DELETE_THREAD_COUNT = "deleteThreads";
+ private static final String PERFORM_DELETE = "performDelete";
+ private static final String LOG_EACH_ITEM = "logEachItem";
+
+ private static final String ACCESS_ID_PROPNAME = "AWS_ACCESS_KEY_ID";
+ private static final String SECRET_KEY_PROPNAME = "AWS_SECRET_KEY";
+ private static final String BUCKET_NAME_PROPNAME = "usergrid.binary.bucketname";
+
+ private static final String ALL_INDEXES = "*";
+ private static final String SCROLL_TIMEOUT = "5m";
+ private static final int SCROLL_SIZE = 10;
+
+ String applicationName;
+ String organizationName;
+
+ AtomicInteger entitiesFound = new AtomicInteger(0);
+ AtomicInteger entityDictionaryEntriesFound = new AtomicInteger(0);
+ AtomicInteger appDictionaryEntriesFound = new AtomicInteger(0);
+ AtomicInteger assetsFound = new AtomicInteger(0);
+ AtomicInteger esDocsFound = new AtomicInteger(0);
+ AtomicInteger orgAdminsFound = new AtomicInteger(0);
+
+ Scheduler deleteScheduler;
+ AmazonS3Client s3Client;
+ EsProvider esProvider;
+ IndexLocationStrategyFactory ilsf;
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+ Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+ int deleteThreadCount = 10; // set via CLI option
+
+ BinaryStore binaryStore;
+
+ String logLineSeparator = "-------------------------------------------------------------------";
+
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option orgNameOption = OptionBuilder.hasArg().isRequired(true).withType("")
+ .withDescription( "Organization Name -" + ORGANIZATION_NAME ).create( ORGANIZATION_NAME );
+ options.addOption( orgNameOption );
+
+ Option appNameOption = OptionBuilder.hasArg().isRequired(false).withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( appNameOption );
+
+ Option performDeleteOption = OptionBuilder.hasArg().isRequired(false)
+ .withDescription("Perform Delete -" + PERFORM_DELETE).create(PERFORM_DELETE);
+ options.addOption( performDeleteOption );
+
+ Option deleteThreadsOption = OptionBuilder.hasArg().isRequired(false)
+ .withType("")
+ .withDescription( "Delete Threads -" + DELETE_THREAD_COUNT).create(DELETE_THREAD_COUNT);
+ options.addOption( deleteThreadsOption );
+
+ Option logEachItemOption = OptionBuilder.hasArg().isRequired(false)
+ .withDescription("Log each item -" + LOG_EACH_ITEM).create(LOG_EACH_ITEM);
+ options.addOption( logEachItemOption );
+
+ return options;
+ }
+
+
+ /**
+ * Tool entry point.
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ organizationName = line.getOptionValue( ORGANIZATION_NAME );
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+ final boolean allApps = StringUtils.isEmpty(applicationName);
+
+ String performDeleteOption = line.getOptionValue(PERFORM_DELETE);
+ final boolean performDelete = StringUtils.isNotEmpty(performDeleteOption) && performDeleteOption.toLowerCase().equals("yes");
+
+ String logEachItemOption = line.getOptionValue(LOG_EACH_ITEM);
+ final boolean logEachItem = StringUtils.isNotEmpty(logEachItemOption) && logEachItemOption.toLowerCase().equals("yes");
+
+ if (StringUtils.isNotEmpty( line.getOptionValue(DELETE_THREAD_COUNT) )) {
+ try {
+ deleteThreadCount = Integer.parseInt( line.getOptionValue(DELETE_THREAD_COUNT) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + DELETE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ startSpring();
+
+ // S3 asset store
+ String accessId = (String)properties.get(ACCESS_ID_PROPNAME);
+ String secretKey = (String)properties.get(SECRET_KEY_PROPNAME);
+ String bucketName = (String)properties.get(BUCKET_NAME_PROPNAME);
+
+ Properties overrides = new Properties();
+ overrides.setProperty( "s3" + ".identity", accessId );
+ overrides.setProperty( "s3" + ".credential", secretKey );
+
+ AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.HTTP);
+
+ s3Client = new AmazonS3Client(credentials, clientConfig);
+
+ // ES
+ ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
+ esProvider = injector.getInstance(EsProvider.class);
+
+ ExecutorService deleteThreadPoolExecutor = Executors.newFixedThreadPool(deleteThreadCount);
+ deleteScheduler = Schedulers.from( deleteThreadPoolExecutor );
+
+ setVerbose( line );
+
+ logger.info(logLineSeparator);
+
+ boolean singleApp = false;
+ String matchingAppPrefix = organizationName + "/";
+ if (StringUtils.isNotEmpty(applicationName)) {
+ singleApp = true;
+ matchingAppPrefix += applicationName;
+ logger.info("APPLICATION:");
+ } else {
+ logger.info("APPLICATIONS FOR ORG " + organizationName + ":");
+ }
+
+ boolean foundApps = false;
+ Map<String, UUID> activeAppMap = new HashMap<>();
+ for (Map.Entry<String, UUID> entry : emf.getApplications().entrySet()) {
+ if (entry.getKey().startsWith(matchingAppPrefix)) {
+ foundApps = true;
+ logger.info("ACTIVE APP: {} - {}", entry.getKey(), entry.getValue().toString());
+ activeAppMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Map<String, UUID> deletedAppMap = new HashMap<>();
+ for (Map.Entry<String, UUID> entry : emf.getDeletedApplications().entrySet()) {
+ if (entry.getKey().startsWith(matchingAppPrefix)) {
+ foundApps = true;
+ logger.info("DELETED APP: {} - {}", entry.getKey(), entry.getValue().toString());
+ deletedAppMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ logger.info(logLineSeparator);
+
+ if (!foundApps) {
+ if (singleApp) {
+ throw new RuntimeException( "Cannot find application " + organizationName + "/" + applicationName );
+ } else {
+ throw new RuntimeException( "Cannot find applications for org " + organizationName );
+ }
+ }
+
+ for (String name : activeAppMap.keySet()) {
+ UUID applicationId = activeAppMap.get(name);
+ final EntityManager em = emf.getEntityManager( applicationId );
+ handleApp(applicationId, name, false, em, performDelete, bucketName, logEachItem);
+ }
+ for (String name : deletedAppMap.keySet()) {
+ UUID applicationId = deletedAppMap.get(name);
+ final EntityManager em = emf.getEntityManager( applicationId );
+ handleApp(applicationId, name, true, em, performDelete, bucketName, logEachItem);
+ }
+
+ if (!singleApp) {
+ // handle org
+ handleOrg(organizationName, performDelete);
+ }
+
+ }
+
+
+ private void handleOrg(String organizationName, boolean performDelete) throws Exception {
+ OrganizationInfo orgInfo = managementService.getOrganizationByName(organizationName);
+ UUID orgUUID = orgInfo.getUuid();
+
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION: {}({})", organizationName, orgUUID);
+ logger.info(logLineSeparator);
+
+ if (performDelete) {
+ try {
+ String clientId = managementService.getClientIdForOrganization(orgUUID);
+ String oldClientSecret = managementService.getClientSecretForOrganization(orgUUID);
+ logger.info(logLineSeparator);
+ logger.info("OLD ORG CLIENT ID: {}", clientId);
+ logger.info("OLD ORG CLIENT SECRET: {}", oldClientSecret);
+ String newClientSecret = managementService.newClientSecretForOrganization(orgInfo.getUuid());
+ logger.info("NEW ORG CLIENT SECRET: {}", newClientSecret);
+ logger.info(logLineSeparator);
+ } catch (Exception e) {
+ logger.error("FAILED TO CHANGE CREDENTIALS FOR ORG " + organizationName + ": " + e.getMessage(), e);
+ }
+ }
+
+ List<UserInfo> userList = managementService.getAdminUsersForOrganization(orgInfo.getUuid());
+
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION ADMINS: {}({})", organizationName, orgInfo.getUuid());
+ logger.info(logLineSeparator);
+ orgAdminsFound.set(0);
+ for (UserInfo user : userList) {
+ orgAdminsFound.incrementAndGet();
+ BiMap<UUID, String> adminOrgs = managementService.getOrganizationsForAdminUser(user.getUuid());
+ int numOrgs = adminOrgs.size();
+ logger.info("ORGADMIN: {} ({}) - number of other orgs: {}", user.getUsername(), user.getEmail(), numOrgs - 1);
+ if (performDelete) {
+ managementService.removeAdminUserFromOrganization(user.getUuid(), orgInfo.getUuid(), true);
+ if (numOrgs <= 1) {
+ logger.info("ORGADMIN {} is in no other orgs -- deleting", user.getUsername());
+ try {
+ boolean success = managementService.deleteAdminUser(user.getUuid());
+ if (!success) {
+ logger.info("ORGADMIN {} - failed to delete", user.getUsername());
+ }
+ } catch (Exception e) {
+ logger.info("ORGADMIN " + user.getUsername() + " - exception while deleting: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION ADMINS {} DONE! OrgAdmins: {}", performDelete ? "DELETE" : "LIST", orgAdminsFound.get());
+ logger.info(logLineSeparator);
+
+ }
+
+
+ private void handleApp(UUID appId, String orgAppName, boolean deletedApp,
+ EntityManager em, boolean performDelete, String bucketName,
+ boolean logEachItem) {
+ logger.info(logLineSeparator);
+ logger.info("APPLICATION: {}({}){}", orgAppName, appId.toString(), deletedApp ? " - DELETED" : "");
+ logger.info(logLineSeparator);
+
+ if (performDelete) {
+ try {
+ String clientId = managementService.getClientIdForApplication(appId);
+ String oldClientSecret = managementService.getClientSecretForApplication(appId);
+ logger.info(logLineSeparator);
+ logger.info("OLD APP CLIENT ID: {}", clientId);
+ logger.info("OLD APP CLIENT SECRET: {}", oldClientSecret);
+ String newClientSecret = managementService.newClientSecretForApplication(appId);
+ logger.info("NEW APP CLIENT SECRET: {}", newClientSecret);
+ logger.info(logLineSeparator);
+ } catch (Exception e) {
+ logger.error("FAILED TO CHANGE CREDENTIALS FOR APP " + orgAppName + ": " + e.getMessage(), e);
+ }
+ }
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING APP DICTIONARIES");
+ logger.info(logLineSeparator);
+ // check for entity dictionaries
+ try {
+ EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+
+ Application application = em.getApplication();
+ //logger.info("APP: {}", application.toString());
+
+ for ( String dictionary : rootEm.getDictionaries( application ) ) {
+ try {
+ //logger.info("DICTIONARY NAME: {}", dictionary);
+ Map<Object, Object> dictMap = rootEm.getDictionaryAsMap(application, dictionary, false);
+ for (Object key : dictMap.keySet()) {
+ appDictionaryEntriesFound.incrementAndGet();
+ if (logEachItem) {
+ logger.info("APP DICTIONARY {} ENTRY: ({})", dictionary, key.toString());
+ }
+ }
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error("APP DICTIONARY CHECK FOR APP " + orgAppName + " FAILED: " + e.getMessage(), e);
+ }
+ logger.info(logLineSeparator);
+ logger.info("APP DICTIONARIES {} DONE! App Dictionary Entries found: {}", performDelete ? "DELETE" : "LIST", appDictionaryEntriesFound.get());
+ logger.info(logLineSeparator);
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING ENTITIES");
+ logger.info(logLineSeparator);
+ entitiesFound.set(0);
+ Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+ collectionsObservable.flatMap( collection -> {
+
+ return Observable.create( new EntityObservable( em, collection ) )
+ .doOnNext( new EntityDeleteAction(em, performDelete, logEachItem) ).subscribeOn(deleteScheduler);
+
+
+ } ).doOnCompleted( new EntityDeleteWrapUpAction(performDelete) ).toBlocking().lastOrDefault(null);
+
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING ASSETS");
+ logger.info(logLineSeparator);
+ assetsFound.set(0);
+
+ ObjectListing listing = null;
+ try {
+ listing = s3Client.listObjects(bucketName, appId.toString() + "/");
+ }
+ catch (Exception e) {
+ logger.error("FAILED TO RETRIEVE ASSETS: ", e);
+ }
+ if (listing != null) {
+ for (S3ObjectSummary summary : listing.getObjectSummaries()) {
+ String assetKey = summary.getKey();
+ assetsFound.getAndIncrement();
+ if (logEachItem) {
+ logger.info("ASSET: {}", assetKey);
+ }
+ if (performDelete) {
+ try {
+ s3Client.deleteObject(bucketName, assetKey);
+ }
+ catch (Exception e) {
+ logger.error("FAILED TO DELETE ASSET: " + assetKey, e);
+ }
+ }
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("Asset {} DONE! Assets: {}", performDelete ? "DELETE" : "LIST", assetsFound.get());
+ logger.info(logLineSeparator);
+
+ // Elasticsearch docs
+ logger.info(logLineSeparator);
+ logger.info("FINDING ES DOCS");
+ logger.info(logLineSeparator);
+ esDocsFound.set(0);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(appId, "application"));
+ // IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
+
+ QueryBuilder qb = QueryBuilders.matchQuery("applicationId", "appId(" + appId.toString() + ",application)");
+ SearchResponse scrollResponse = esProvider.getClient()
+ .prepareSearch(ALL_INDEXES)
+ .setScroll(SCROLL_TIMEOUT)
+ .setSearchType(SearchType.SCAN)
+ .setQuery(qb)
+ .setSize(SCROLL_SIZE)
+ .setNoFields()
+ .execute().actionGet();
+
+ //logger.info(scrollResponse.toString());
+
+ while (true) {
+ BulkRequestBuilder bulkRequest = null;
+ if (performDelete) {
+ bulkRequest = esProvider.getClient().prepareBulk();
+ }
+ boolean docsToDelete = false;
+ for (SearchHit hit : scrollResponse.getHits().getHits()) {
+ esDocsFound.getAndIncrement();
+ if (logEachItem) {
+ logger.info("ES DOC: {}", hit.getId());
+ }
+ if (performDelete) {
+ docsToDelete = true;
+ bulkRequest.add(esProvider.getClient()
+ .prepareDelete(hit.getIndex(), hit.getType(), hit.getId()));
+ }
+ }
+
+ if (docsToDelete) {
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+ if (bulkResponse.hasFailures()) {
+ throw new RuntimeException(bulkResponse.buildFailureMessage());
+ }
+ }
+
+ scrollResponse = esProvider.getClient().prepareSearchScroll(scrollResponse.getScrollId())
+ .setScroll(SCROLL_TIMEOUT).execute().actionGet();
+
+ //logger.info(scrollResponse.toString());
+
+ if (scrollResponse.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("ES Doc {} DONE! ES Docs: {}", performDelete ? "DELETE" : "LIST", esDocsFound.get());
+ logger.info(logLineSeparator);
+
+ }
+
+
+
+ // ----------------------------------------------------------------------------------------
+ // reading data
+
+
+ /**
+ * Emits collection names found in application.
+ */
+ private class CollectionsObservable implements Observable.OnSubscribe<String> {
+ EntityManager em;
+
+ public CollectionsObservable(EntityManager em) {
+ this.em = em;
+ }
+
+ public void call(Subscriber<? super String> subscriber) {
+
+ int count = 0;
+ try {
+ Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+
+ logger.debug( "Emitting {} collection names for application {}",
+ collectionMetadata.size(), em.getApplication().getName() );
+
+ for ( String collection : collectionMetadata.keySet() ) {
+ subscriber.onNext( collection );
+ count++;
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+
+ subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
+ }
+ }
+
+
+ /**
+ * Emits entities of collection.
+ */
+ private class EntityObservable implements Observable.OnSubscribe<ExportEntity> {
+ EntityManager em;
+ String collection;
+
+ public EntityObservable(EntityManager em, String collection) {
+ this.em = em;
+ this.collection = collection;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+
+ logger.info("Starting to fetch entities of collection {}", collection);
+
+ //subscriber.onStart();
+
+ try {
+ int count = 0;
+
+ Query query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+
+ Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+ while (results.size() > 0) {
+ for (Entity entity : results.getEntities()) {
+ try {
+ Set<String> dictionaries = em.getDictionaries( entity );
+ Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+ if (dict.isEmpty()) {
+ continue;
+ }
+ dictionariesByName.put( dictionary, dict );
+ }
+
+ ExportEntity exportEntity = new ExportEntity(
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
+ subscriber.onNext( exportEntity );
+ count++;
+
+ } catch (Exception e) {
+ logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+ }
+ }
+ if (results.getCursor() == null) {
+ break;
+ }
+ query.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collection, query );
+ }
+
+ subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
+
+ } catch ( Exception e ) {
+ subscriber.onError(e);
+ }
+ }
+ }
+
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+
+ /**
+ * Delete entities.
+ */
+ private class EntityDeleteAction implements Action1<ExportEntity> {
+
+ EntityManager em;
+ boolean performDelete;
+ boolean logEachEntity;
+
+ public EntityDeleteAction(EntityManager em, boolean performDelete, boolean logEachEntity) {
+ this.em = em;
+ this.performDelete = performDelete;
+ this.logEachEntity = logEachEntity;
+ }
+
+ public void call(ExportEntity entity) {
+
+ try {
+ entitiesFound.getAndIncrement();
+ if (logEachEntity) {
+ logger.info("ENTITY: {}", entity.getEntity().asId().toString());
+ }
+
+ // check for entity dictionaries
+ if (entity.getDictionaries().size() > 0) {
+ for (String dictionaryName : entity.getDictionaries().keySet()) {
+ Map<Object, Object> dictMap = em.getDictionaryAsMap(entity.getEntity(), dictionaryName);
+ for (Object key : dictMap.keySet()) {
+ entityDictionaryEntriesFound.incrementAndGet();
+ if (logEachEntity) {
+ logger.info("ENTITY DICTIONARY ENTRY ({}-{}): ({}): ({})",
+ entity.getEntity().asId().toString(),
+ dictionaryName, key.toString(),
+ dictMap.get(key).toString());
+ }
+ }
+ }
+ }
+
+ if (performDelete) {
+ em.delete(entity.getEntity());
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error deleting entity (IOException): " + e.getMessage(), e);
+ } catch (Exception e) {
+ throw new RuntimeException("Error deleting entity: " + e.getMessage(), e);
+ }
+ }
+ }
+
+
+ private class EntityDeleteWrapUpAction implements Action0 {
+
+ boolean performDelete;
+
+ public EntityDeleteWrapUpAction(boolean performDelete) {
+ this.performDelete = performDelete;
+ }
+
+ @Override
+ public void call() {
+
+ logger.info(logLineSeparator);
+ logger.info("Entity {} DONE! Entities: {}", performDelete ? "DELETE" : "LIST", entitiesFound.get());
+ logger.info(logLineSeparator);
+ }
+ }
+}
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
index 26b5f5f..bfe5edf 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/CollectionIterator.java
@@ -23,23 +23,16 @@
import com.google.common.base.Optional;
import com.netflix.astyanax.MutationBatch;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
-import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
-import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.*;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.entity.*;
-import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import static org.apache.commons.lang.StringUtils.isBlank;
@@ -63,7 +56,9 @@
private static final String ENTITY_TYPE_ARG = "entityType";
- private static final String REMOVE_CONNECTIONS_ARG = "removeConnections";
+ private static final String REMOVE_DUPLICATE_CONNECTIONS_ARG = "removeDuplicateConnections";
+
+ private static final String REMOVE_ORPHAN_CONNECTIONS_ARG = "removeOrphanConnections";
private static final String LATEST_TIMESTAMP_ARG = "latestTimestamp";
@@ -95,11 +90,17 @@
options.addOption( collectionOption );
- Option removeConnectionsOption =
- OptionBuilder.withArgName(REMOVE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" )
- .create(REMOVE_CONNECTIONS_ARG);
+ Option removeOrphanConnectionsOption =
+ OptionBuilder.withArgName(REMOVE_ORPHAN_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove orphaned connections" )
+ .create(REMOVE_ORPHAN_CONNECTIONS_ARG);
- options.addOption( removeConnectionsOption );
+ options.addOption( removeOrphanConnectionsOption );
+
+ Option removeDuplicateConnectionsOption =
+ OptionBuilder.withArgName(REMOVE_DUPLICATE_CONNECTIONS_ARG).hasArg().isRequired( false ).withDescription( "remove duplicate connections" )
+ .create(REMOVE_DUPLICATE_CONNECTIONS_ARG);
+
+ options.addOption( removeDuplicateConnectionsOption );
Option earliestTimestampOption =
OptionBuilder.withArgName(EARLIEST_TIMESTAMP_ARG).hasArg().isRequired( false ).withDescription( "earliest timestamp to delete" )
@@ -137,7 +138,8 @@
String applicationOption = line.getOptionValue(APPLICATION_ARG);
String entityTypeOption = line.getOptionValue(ENTITY_TYPE_ARG);
- String removeConnectionsOption = line.getOptionValue(REMOVE_CONNECTIONS_ARG);
+ String removeOrphanConnectionsOption = line.getOptionValue(REMOVE_ORPHAN_CONNECTIONS_ARG);
+ String removeDuplicateConnectionsOption = line.getOptionValue(REMOVE_DUPLICATE_CONNECTIONS_ARG);
String earliestTimestampOption = line.getOptionValue(EARLIEST_TIMESTAMP_ARG);
String latestTimestampOption = line.getOptionValue(LATEST_TIMESTAMP_ARG);
String secondsInPastOption = line.getOptionValue(SECONDS_IN_PAST_ARG);
@@ -152,7 +154,8 @@
}
String entityType = entityTypeOption;
- final boolean removeOrphans = !isBlank(removeConnectionsOption) && removeConnectionsOption.toLowerCase().equals("yes");
+ final boolean removeOrphans = !isBlank(removeOrphanConnectionsOption) && removeOrphanConnectionsOption.toLowerCase().equals("yes");
+ final boolean removeDuplicates = !isBlank(removeDuplicateConnectionsOption) && removeDuplicateConnectionsOption.toLowerCase().equals("yes");
if (!isBlank(secondsInPastOption) && !isBlank(latestTimestampOption)) {
throw new RuntimeException("Can't specify both latest timestamp and seconds in past options.");
@@ -222,32 +225,63 @@
new SimpleSearchByEdgeType( applicationScopeId, simpleEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.absent(), false );
+ Set<UUID> uuidSet = new HashSet<>();
+
gm.loadEdgesFromSource(search).map(markedEdge -> {
UUID uuid = markedEdge.getTargetNode().getUuid();
+ long edgeTimestamp = markedEdge.getTimestamp();
+ String edgeType = markedEdge.getType();
+ boolean duplicate = uuidSet.contains(uuid);
+ if (!duplicate) {
+ uuidSet.add(uuid);
+ }
try {
EntityRef entityRef = new SimpleEntityRef(entityType, uuid);
org.apache.usergrid.persistence.Entity retrieved = em.get(entityRef);
long timestamp = 0;
+ DateFormat df = new SimpleDateFormat();
+ df.setTimeZone(TimeZone.getTimeZone("GMT"));
String dateString = "NOT TIME-BASED";
if (UUIDUtils.isTimeBased(uuid)){
timestamp = UUIDUtils.getTimestampInMillis(uuid);
Date uuidDate = new Date(timestamp);
- DateFormat df = new SimpleDateFormat();
- df.setTimeZone(TimeZone.getTimeZone("GMT"));
dateString = df.format(uuidDate) + " GMT";
}
+ Date uuidEdgeDate = new Date(UUIDUtils.getUnixTimestampInMillisFromUUIDTimestamp(edgeTimestamp));
+ String edgeDateString = df.format(uuidEdgeDate) + " GMT";
+
if ( retrieved != null ){
- logger.info("{} - {} - entity data found", uuid, dateString);
+ if (duplicate) {
+ if (removeDuplicates) {
+ logger.info("DUPLICATE ENTITY (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+ try {
+ MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
+ logger.info("BATCH: {}", batch);
+ batch.execute();
+ } catch (Exception e) {
+ logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
+ }
+ } else {
+ logger.info("DUPLICATE ENTITY (WON'T REMOVE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+ }
+
+ } else {
+ logger.info("ENTITY: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString);
+ }
+
+
}else{
if (removeOrphans && timestamp >= earliestTimestamp && timestamp <= latestTimestamp) {
- logger.info("{} - {} - entity data NOT found, REMOVING", uuid, dateString);
+ logger.info("NOT FOUND (REMOVING): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
try {
- //em.removeItemFromCollection(headEntity, collectionName, entityRef );
- logger.info("entityRef.asId(): {}", entityRef.asId());
MutationBatch batch = edgeSerialization.deleteEdge(applicationScope, markedEdge, UUIDUtils.newTimeUUID());
logger.info("BATCH: {}", batch);
batch.execute();
@@ -255,9 +289,11 @@
logger.error("{} - exception while trying to remove orphaned connection, {}", uuid, e.getMessage());
}
} else if (removeOrphans) {
- logger.info("{} - {} ({}) - entity data NOT found, not removing because timestamp not in range", uuid, dateString, timestamp);
+ logger.info("NOT FOUND (TIMESTAMP OUT OF RANGE): uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
} else {
- logger.info("{} - {} ({}) - entity data NOT found", uuid, dateString, timestamp);
+ logger.info("NOT FOUND: uuid:{} edgeTimestamp:{}({}) edgeType:{} timestamp:{} uuidDate:{} - isDeleted:{} isSourceNodeDeleted:{} isTargetNodeDeleted:{}",
+ uuid, edgeTimestamp, edgeDateString, edgeType, timestamp, dateString, markedEdge.isDeleted(), markedEdge.isSourceNodeDelete(), markedEdge.isTargetNodeDeleted());
}
}
} catch (Exception e) {
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
index 821d011..b07d09d 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
@@ -17,378 +17,195 @@
package org.apache.usergrid.tools;
+import static org.apache.usergrid.management.AccountCreationProps.PROPERTIES_USERGRID_BINARY_UPLOADER;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.usergrid.persistence.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.usergrid.management.OrganizationInfo;
-import org.apache.usergrid.management.UserInfo;
-import org.apache.usergrid.tools.bean.ExportOrg;
-import org.apache.usergrid.utils.JsonUtils;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilder;
+import org.apache.usergrid.corepersistence.export.ExportRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.services.assets.BinaryStoreFactory;
+import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.tools.bean.ExportOrg;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
public class Export extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( Export.class );
+ public static final String LAST_ID = "lastId";
+
+
+ @Autowired
+ private BinaryStoreFactory binaryStoreFactory;
JsonFactory jsonFactory = new JsonFactory();
-
+
+ private AllEntityIdsObservable allEntityIdsObs;
+ private SimpleEdge lastEdge = null;
+
+ //TODO : Add blocking queues for these executors where appropriate
+ private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3);
+ private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+ private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10);
+ private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10);
+
@Override
- public void runTool( CommandLine line ) throws Exception {
- startSpring();
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
+ .withDescription( "Last Entity Id to resume from" ).create( LAST_ID );
+ options.addOption( lastId);
+
+ return options;
+ }
+
+ @Override
+ public void runTool( CommandLine line ) throws Exception {
+
+ startSpring();
setVerbose( line );
- // ExportDataCreator dataCreator = new ExportDataCreator(emf,
- // managementService);
- // dataCreator.createTestData();
-
- applyOrgId( line );
+ Gson gson = new GsonBuilder().create();
+
+ this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+ applyExportParams(line);
prepareBaseOutputFileName( line );
+
+
+ if(lastEdgeJson != null) {
+ JSONObject lastEdgeJsonObj = new JSONObject(lastEdgeJson);
+ UUID uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("sourceNode").getString("uuid"));
+ Id sourceId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("sourceNode").getString("type"));
+ uuid = UUID.fromString(lastEdgeJsonObj.getJSONObject("targetNode").getString("uuid"));
+ Id targetId = new SimpleId(uuid, lastEdgeJsonObj.getJSONObject("targetNode").getString("type"));
+ lastEdge = new SimpleEdge(sourceId, lastEdgeJsonObj.getString("type"), targetId, lastEdgeJsonObj.getLong("timestamp"));
+ }
+
outputDir = createOutputParentDir();
logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+
// Export organizations separately.
exportOrganizations();
+
+ logger.info("Finished export waiting for threads to end.");
- // Loop through the organizations
- Map<UUID, String> organizations = getOrgs();
- for ( Entry<UUID, String> organization : organizations.entrySet() ) {
+ while(true) {
+ try {
+ //Spinning to prevent program execution from ending.
+ //Need to replace with some kind of countdown latch or task tracker
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ logger.error("Exception while waiting for export to complete.",e);
+ }
+ }
- if ( organization.equals( properties.getProperty( "usergrid.test-account.organization" ) ) ) {
- // Skip test data from being exported.
- continue;
- }
-
- exportApplicationsForOrg( organization );
- }
}
-
-
- private Map<UUID, String> getOrgs() throws Exception {
- // Loop through the organizations
- Map<UUID, String> organizationNames = null;
-
- if ( orgId == null ) {
- organizationNames = managementService.getOrganizations();
- }
-
- else {
- OrganizationInfo info = managementService.getOrganizationByUuid( orgId );
-
- if ( info == null ) {
- logger.error( "Organization info is null!" );
- System.exit( 1 );
- }
-
- organizationNames = new HashMap<UUID, String>();
- organizationNames.put( orgId, info.getName() );
- }
-
-
- return organizationNames;
- }
-
-
- private void exportApplicationsForOrg( Entry<UUID, String> organization ) throws Exception {
- logger.info( "" + organization );
-
- // Loop through the applications per organization
- BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( organization.getKey() );
- for ( Entry<UUID, String> application : applications.entrySet() ) {
-
- logger.info( application.getValue() + " : " + application.getKey() );
-
- // Get the JSon serializer.
- com.fasterxml.jackson.core.JsonGenerator jg =
- getJsonGenerator( createOutputFile( "application", application.getValue() ) );
-
- // load the dictionary
- EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
-
- Entity appEntity = rootEm.get( new SimpleEntityRef( "application", application.getKey()));
-
- Map<String, Object> dictionaries = new HashMap<String, Object>();
-
- for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
- Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
-
- // nothing to do
- if ( dict.isEmpty() ) {
- continue;
- }
-
- dictionaries.put( dictionary, dict );
- }
-
- EntityManager em = emf.getEntityManager( application.getKey() );
-
- // Get application
- Entity nsEntity = em.get( new SimpleEntityRef( "application", application.getKey()));
-
- Set<String> collections = em.getApplicationCollections();
-
- // load app counters
-
- Map<String, Long> entityCounters = em.getApplicationCounters();
-
- nsEntity.setMetadata( "organization", organization );
- nsEntity.setMetadata( "dictionaries", dictionaries );
- // counters for collections
- nsEntity.setMetadata( "counters", entityCounters );
- nsEntity.setMetadata( "collections", collections );
-
- jg.writeStartArray();
- jg.writeObject( nsEntity );
-
- // Create a GENERATOR for the application collections.
- JsonGenerator collectionsJg =
- getJsonGenerator( createOutputFile( "collections", application.getValue() ) );
- collectionsJg.writeStartObject();
-
- Map<String, Object> metadata = em.getApplicationCollectionMetadata();
- echo( JsonUtils.mapToFormattedJsonString( metadata ) );
-
- // Loop through the collections. This is the only way to loop
- // through the entities in the application (former namespace).
- for ( String collectionName : metadata.keySet() ) {
-
- Query query = new Query();
- query.setLimit( MAX_ENTITY_FETCH );
- query.setResultsLevel( Query.Level.ALL_PROPERTIES );
-
- Results entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
-
- while ( entities.size() > 0 ) {
-
- for ( Entity entity : entities ) {
- // Export the entity first and later the collections for
- // this entity.
- jg.writeObject( entity );
- echo( entity );
-
- saveCollectionMembers( collectionsJg, em, application.getValue(), entity );
- }
-
- //we're done
- if ( entities.getCursor() == null ) {
- break;
- }
-
-
- query.setCursor( entities.getCursor() );
-
- entities = em.searchCollection( em.getApplicationRef(), collectionName, query );
- }
- }
-
- // Close writer for the collections for this application.
- collectionsJg.writeEndObject();
- collectionsJg.close();
-
- // Close writer and file for this application.
- jg.writeEndArray();
- jg.close();
- }
- }
-
-
- /**
- * Serialize and save the collection members of this <code>entity</code>
- *
- * @param em Entity Manager
- * @param application Application name
- * @param entity entity
- */
- private void saveCollectionMembers( JsonGenerator jg, EntityManager em, String application, Entity entity )
- throws Exception {
-
- Set<String> collections = em.getCollections( entity );
-
- // Only create entry for Entities that have collections
- if ( ( collections == null ) || collections.isEmpty() ) {
- return;
- }
-
- jg.writeFieldName( entity.getUuid().toString() );
- jg.writeStartObject();
-
- for ( String collectionName : collections ) {
-
- jg.writeFieldName( collectionName );
- // Start collection array.
- jg.writeStartArray();
-
- Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Query.Level.IDS, false );
-
- List<UUID> entityIds = collectionMembers.getIds();
-
- if ( ( entityIds != null ) && !entityIds.isEmpty() ) {
- for ( UUID childEntityUUID : entityIds ) {
- jg.writeObject( childEntityUUID.toString() );
- }
- }
-
- // End collection array.
- jg.writeEndArray();
- }
-
- // Write connections
- saveConnections( entity, em, jg );
-
- // Write dictionaries
- saveDictionaries( entity, em, jg );
-
- // End the object if it was Started
- jg.writeEndObject();
- }
-
-
- /** Persists the connection for this entity. */
- private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
-
- jg.writeFieldName( "dictionaries" );
- jg.writeStartObject();
-
- Set<String> dictionaries = em.getDictionaries( entity );
- for ( String dictionary : dictionaries ) {
-
- Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
-
- // nothing to do
- if ( dict.isEmpty() ) {
- continue;
- }
-
- jg.writeFieldName( dictionary );
-
- jg.writeStartObject();
-
- for ( Entry<Object, Object> entry : dict.entrySet() ) {
- jg.writeFieldName( entry.getKey().toString() );
- jg.writeObject( entry.getValue() );
- }
-
- jg.writeEndObject();
- }
- jg.writeEndObject();
- }
-
-
- /** Persists the connection for this entity. */
- private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
-
- jg.writeFieldName( "connections" );
- jg.writeStartObject();
-
- Set<String> connectionTypes = em.getConnectionTypes( entity );
- for ( String connectionType : connectionTypes ) {
-
- jg.writeFieldName( connectionType );
- jg.writeStartArray();
-
- Results results = em.getTargetEntities(
- entity, connectionType, null, Query.Level.IDS );
-
- List<ConnectionRef> connections = results.getConnections();
-
- for ( ConnectionRef connectionRef : connections ) {
- jg.writeObject( connectionRef.getTargetRefs().getUuid() );
- }
-
- jg.writeEndArray();
- }
- jg.writeEndObject();
- }
-
- /*-
- * Set<String> collections = em.getCollections(entity);
- * for (String collection : collections) {
- * Results collectionMembers = em.getCollection(
- * entity, collection, null,
- * MAX_ENTITY_FETCH, Level.IDS, false);
- * write entity_id : { "collectionName" : [ids]
- * }
- * }
- *
- *
- * {
- * entity_id :
- * { collection_name :
- * [
- * collected_entity_id,
- * collected_entity_id
- * ]
- * },
- * f47ac10b-58cc-4372-a567-0e02b2c3d479 :
- * { "activtites" :
- * [
- * f47ac10b-58cc-4372-a567-0e02b2c3d47A,
- * f47ac10b-58cc-4372-a567-0e02b2c3d47B
- * ]
- * }
- * }
- *
- * http://jackson.codehaus.org/1.8.0/javadoc/org/codehaus/jackson/JsonGenerator.html
- *
- *
- *-
- * List<ConnectedEntityRef> connections = em.getConnections(entityId, query);
- */
-
-
+
+
+
private void exportOrganizations() throws Exception, UnsupportedEncodingException {
+ for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
- for ( Entry<UUID, String> organizationName : getOrgs().entrySet() ) {
+ // Let's skip the test entities.
+ if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+ continue;
+ }
- // Let's skip the test entities.
- if ( organizationName.equals( properties.getProperty( "usergrid.test-account.organization" ) ) ) {
- continue;
- }
+ OrganizationInfo orgInfo = managementService.getOrganizationByUuid(organizationName.getKey());
+ logger.info("Exporting Organization: " + orgInfo.getName());
- OrganizationInfo acc = managementService.getOrganizationByUuid( organizationName.getKey() );
- logger.info( "Exporting Organization: " + acc.getName() );
+ ExportOrg exportOrg = new ExportOrg(orgInfo);
- ExportOrg exportOrg = new ExportOrg( acc );
+ List<UserInfo> users = managementService.getAdminUsersForOrganization(organizationName.getKey());
- List<UserInfo> users = managementService.getAdminUsersForOrganization( organizationName.getKey() );
+ for (UserInfo user : users) {
+ exportOrg.addAdmin(user.getUsername());
+ }
- for ( UserInfo user : users ) {
- exportOrg.addAdmin( user.getUsername() );
- }
+ File orgDir = createOrgDir(orgInfo.getName());
- // One file per Organization.
- saveOrganizationInFile( exportOrg );
- }
- }
+ // One file per Organization.
+ saveOrganizationMetadata(orgDir, exportOrg);
+ exportApplicationsForOrg(orgDir, organizationName.getKey(), organizationName.getValue());
+ }
+ }
/**
* Serialize an Organization into a json file.
+ * @param orgDir
*
* @param acc OrganizationInfo
*/
- private void saveOrganizationInFile( ExportOrg acc ) {
+ private void saveOrganizationMetadata( File orgDir, ExportOrg acc ) {
+
try {
- File outFile = createOutputFile( "organization", acc.getName() );
+ File outFile = createOutputFile( orgDir, "organization", acc.getName() );
com.fasterxml.jackson.core.JsonGenerator jg = getJsonGenerator( outFile );
jg.writeObject( acc );
jg.close();
@@ -399,32 +216,503 @@
}
- public void streamOutput( File file, List<Entity> entities ) throws Exception {
- JsonFactory jsonFactory = new JsonFactory();
- // or, for data binding,
- // org.codehaus.jackson.mapper.MappingJsonFactory
- JsonGenerator jg = jsonFactory.createJsonGenerator( file, JsonEncoding.UTF8 );
- // or Stream, Reader
+ private Map<UUID, String> getOrgs() throws Exception {
+ // Loop through the organizations
+ Map<UUID, String> organizationNames = null;
+
+ if ( orgId == null && (orgName == null || orgName.trim().equals(""))) {
+ organizationNames = managementService.getOrganizations();
+ }
+ else {
+ OrganizationInfo info = null;
+
+ if( orgId != null ) {
+ info = managementService.getOrganizationByUuid( orgId );
+ }
+ else {
+ info = managementService.getOrganizationByName( orgName );
+ }
+
+ if ( info == null ) {
+ logger.error( "Organization info is null!" );
+ System.exit( 1 );
+ }
+
+ organizationNames = new HashMap<UUID, String>();
+ organizationNames.put( info.getUuid(), info.getName() );
+ }
+
+ return organizationNames;
+ }
+
+ private void exportApplicationsForOrg(File orgDir, UUID orgId, String orgName ) throws Exception {
+
+ logger.info("Exporting applications for {} : {} ",orgId, orgName);
+
+ // Loop through the applications per organization
+ BiMap<UUID, String> applications = managementService.getApplicationsForOrganization( orgId );
+
+ if ( applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+ //export all apps as appId or name is not provided
+
+ Observable.from(applications.entrySet())
+ .subscribeOn(Schedulers.from(orgAppCollParallelizer))
+ .subscribe(appEntry -> {
+ UUID appId = appEntry.getKey();
+ String appName = appEntry.getValue().split("/")[1];
+ try {
+ exportApplication(orgDir, appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception exporting application {} : {}",appName, appId, e);
+ }
+ });
+
+ }
+ else {
+
+ UUID appId = applicationId;
+ String appName = applicationName;
+
+ if( applicationId != null ) {
+ appName = applications.get(appId);
+ }
+ else {
+ appId = applications.inverse().get(orgName+'/'+appName);
+ }
+
+ try {
+ exportApplication(orgDir, appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception exporting application {} : {}",appName, appId, e);
+ }
+
+ }
+ }
+
+ private void exportApplication(File orgDir, UUID appId, String appName) throws Exception {
+
+
+ logger.info( "Starting application export for {} : {} ",appName, appId );
+ File appDir = createApplicationDir(orgDir, appName);
+
+ JsonGenerator jg =
+ getJsonGenerator( createOutputFile( appDir, "application", appName) );
+
+ // load the dictionary
+ EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+
+ Entity appEntity = rootEm.get( new SimpleEntityRef( "org_application", appId));
+
+ Map<String, Object> dictionaries = new HashMap<String, Object>();
+
+ for ( String dictionary : rootEm.getDictionaries( appEntity ) ) {
+ Map<Object, Object> dict = rootEm.getDictionaryAsMap( appEntity, dictionary );
+
+ // nothing to do
+ if ( dict.isEmpty() ) {
+ continue;
+ }
+
+ dictionaries.put( dictionary, dict );
+ }
+
+ EntityManager em = emf.getEntityManager( appId);
+
+ // Get application
+ Entity nsEntity = em.get( new SimpleEntityRef( "application", appId));
+
+ Set<String> collections = em.getApplicationCollections();
+
+ // load app counters
+
+ Map<String, Long> entityCounters = em.getApplicationCounters();
+
+ //nsEntity.setMetadata( "organization", orgName );
+ nsEntity.setMetadata( "dictionaries", dictionaries );
+ // counters for collections
+ nsEntity.setMetadata( "counters", entityCounters );
+ nsEntity.setMetadata( "collections", collections );
jg.writeStartArray();
- for ( Entity entity : entities ) {
- jg.writeObject( entity );
- }
- jg.writeEndArray();
-
+ jg.writeObject( nsEntity );
jg.close();
- }
+
+ if ( collNames == null || collNames.length <= 0) {
+ //export all collections as collection names are not provided
+
+ Observable.from(collections)
+ .subscribeOn(Schedulers.from(orgAppCollParallelizer))
+ .subscribe(collectionName -> {
+ exportCollection(appDir, appId, collectionName, em);
+ });
+
+ }
+ else {
+ Observable.from(collNames)
+ .subscribeOn(Schedulers.from(orgAppCollParallelizer))
+ .subscribe(collectionName -> {
+ if(collections.contains(collectionName)) {
+ exportCollection(appDir, appId, collectionName, em);
+ }
+ });
+ }
+
+ }
- // to generate the activities and user relationship, follow this:
+ private void exportCollection(File appDir, UUID appId, String collectionName, EntityManager em) {
+ File collectionDir = createCollectionDir(appDir, collectionName);
+ extractEntityIdsForCollection(collectionDir, appId, collectionName);
+ }
+
+ private void extractEntityIdsForCollection(File collectionDir, UUID applicationId, String collectionName) {
+
+ AtomicInteger batch = new AtomicInteger(1);
+
+ final EntityManager rootEm = emf.getEntityManager(applicationId);
+ final Gson gson = new GsonBuilder().create();
+ ManagerCache managerCache = injector.getInstance(ManagerCache.class);
+ ExportRequestBuilder builder = new ExportRequestBuilderImpl().withApplicationId(applicationId);
+ final ApplicationScope appScope = builder.getApplicationScope().get();
+ GraphManager gm = managerCache.getGraphManager(appScope);
+ EntityCollectionManagerFactory entityCollectionManagerFactory = injector
+ .getInstance(EntityCollectionManagerFactory.class);
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
- // write field name (id)
- // write start object
- // write field name (collection name)
- // write start array
- // write object/string
- // write another object
- // write end array
- // write end object
- // ...... more objects
- //
+ ExecutorService entityIdWriter = Executors.newFixedThreadPool(1);
+ allEntityIdsObs
+ .getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),(this.lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+ .buffer(1000)
+ .finallyDo(()-> {
+ entityIdWriter.shutdown();
+ logger.info("Finished fetching entity ids for {}. Shutting down entity id writer executor ", collectionName);
+ while(!entityIdWriter.isTerminated()) {
+ try {
+ entityIdWriter.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+ logger.info("Entity id writer executor terminated after shutdown for {}", collectionName);
+ })
+ .subscribe(edges -> {
+
+ logger.info("For collection {}" , collectionName);
+ Integer batchId = batch.getAndIncrement();
+ logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+ Observable.just(edges)
+ .subscribeOn(Schedulers.from(entityIdWriter))
+ .subscribe(edgeScopes -> {
+
+ List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+ for (EdgeScope edgeScope : edgeScopes) {
+ // write to file
+ Id entityId = edgeScope.getEdge().getTargetNode();
+ if (entityId != null) {
+ entityIds.add(entityId.getUuid());
+ } else {
+ edgeScopes.remove(edgeScope);
+ }
+ }
+ // extract name for this batch
+ try {
+ writeEntityIdsBatch(collectionDir, edgeScopes, batchId, collectionName);
+ String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+ Observable.just(entityIds)
+ .subscribeOn(Schedulers.from(entityFetcher)) // change to
+ .subscribe(entIds -> {
+
+ // get entities here
+ logger.info("entIds count {} for type {}", entIds.size(), type);
+ Results entities = rootEm.getEntities(entIds, type);
+ int size = entities.getEntities().size();
+ logger.info("Got {} entities.", size);
+
+ if(!skipConnections || !skipDictionaries || !skipAssets) {
+
+ ConnectableObservable<Results> entityObs = Observable.just(entities)
+ .publish();
+ entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
+
+
+ // fetch and write connections
+ if(!skipConnections) {
+ entityObs.subscribe(entity -> {
+ fetchConnections(gm, ecm, entity, collectionDir, collectionName, batchId, gson);
+
+ });
+ }
+ // fetch and write dictionaries
+ if(!skipDictionaries) {
+ entityObs.subscribe(entity -> {
+ fetchDictionaries(collectionDir, collectionName, rootEm,
+ entity, gson, batchId);
+ });
+ }
+
+ if(!skipAssets) {
+ File assetsDir = createDir(collectionDir.getAbsolutePath(), "files");
+ entityObs.subscribe(entity -> {
+ try {
+ fetchAssets(assetsDir, applicationId, collectionName, batchId, entities);
+ } catch (Exception e) {
+ logger.error("Exception while trying to fetch assets for app {}, collection {}, batch {} ",
+ applicationId, collectionName, batchId, e);
+ }
+ });
+ }
+ entityObs.connect();
+ }
+ writeEntities(collectionDir, entities, batchId, collectionName, gson);
+ });
+
+ } catch (Exception e) {
+ logger.error("There was an error writing entity ids to file for "
+ + edgeScopes.get(0).getEdge(), e);
+ // since entity id writing has failed, we need to see how we can not exit the
+ // whole program
+ System.exit(0);
+ }
+ });
+
+ logger.info("Finished fetching details for collection {} for batch {}", collectionName, batchId);
+ });
+ logger.info("Exiting extractEntityIdsForCollection() method.");
+ }
+
+ private void fetchAssets(File assetsDir, UUID applicationId, String collectionName, Integer batchId,
+ Results entities) throws Exception {
+
+ List<Entity> entitiesWithAssets = new ArrayList<>();
+
+ for (Entity e : entities.getEntities()) {
+ if (e.getProperty("file-metadata") != null) {
+ entitiesWithAssets.add(e);
+ }
+ }
+
+ if (!entitiesWithAssets.isEmpty()) {
+
+ writeAssets(assetsDir, collectionName, batchId, entitiesWithAssets);
+
+ ConnectableObservable<Entity> entityAssets = Observable.from(entitiesWithAssets).publish();
+ entityAssets.subscribeOn(Schedulers.from(assetsFetcher));
+ entityAssets.subscribe(e -> {
+ // Write code to fetch these assets from entity store.
+ BinaryStore binaryStore = null;
+ try {
+ binaryStore = binaryStoreFactory
+ .getBinaryStore(properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER));
+ } catch (Exception e2) {
+ logger.error("Except on while trying to get binary store for property {}, ", properties.getProperty(PROPERTIES_USERGRID_BINARY_UPLOADER), e2 );
+ }
+
+ File file = new File(assetsDir + "/" + collectionName + "_assets_" + e.getUuid());
+ try (InputStream in = binaryStore.read(applicationId, e);
+ OutputStream out = new BufferedOutputStream(new FileOutputStream(file));) {
+
+ int read = -1;
+
+ while ((read = in.read()) != -1) {
+ out.write(read);
+ }
+
+ } catch (Exception e1) {
+ logger.error("Exception while to write assets file for entity {}", e.getUuid(), e1);
+ }
+
+ });
+ entityAssets.connect();
+ }
+ }
+
+ private void writeAssets(final File collectionDir, final String collectionName, final Integer batchId,
+ List<Entity> entitiesWithAssets2) {
+
+ try (BufferedWriter assetsWriter = new BufferedWriter(
+ new FileWriter(new File(collectionDir + "/" + collectionName + "_assets_" + batchId + ".json")));) {
+ for (Entity e : entitiesWithAssets2) {
+ JSONObject object = new JSONObject();
+ object.put("uuid", e.getUuid());
+ object.put("type", e.getType());
+ object.put("file-metadata", e.getProperty("file-metadata"));
+ object.put("file", (e.getProperty("file") != null) ? e.getProperty("file") : null);
+ assetsWriter.write(object.toString());
+ assetsWriter.newLine();
+ }
+ } catch (Exception ex) {
+ logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId,
+ ex);
+ }
+ }
+
+ private void fetchDictionaries(File collectionDir, String collectionName, final EntityManager rootEm,
+ Results entity, Gson gson, Integer batchId) {
+
+ //TODO : still using JsonGenerator
+ JsonGenerator jgDictionaries = null;
+ try {
+ jgDictionaries = getJsonGenerator(new File(collectionDir + "/" + collectionName + "_" + "dictionaries_" + batchId));
+
+ for (Entity et : entity.getEntities()) {
+ Set<String> dictionaries;
+ try {
+ dictionaries = rootEm.getDictionaries(et);
+
+ jgDictionaries.writeStartArray();
+ if (dictionaries != null && !dictionaries.isEmpty()) {
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = rootEm.getDictionaryAsMap(et, dictionary);
+ if (dict != null && dict.isEmpty()) {
+ continue;
+ }
+
+ jgDictionaries.writeStartObject();
+ jgDictionaries.writeObjectField(dictionary, dict);
+ jgDictionaries.writeEndObject();
+ }
+ jgDictionaries.writeEndArray();
+ }
+ } catch (Exception e) {
+ logger.error("Exception while trying to fetch dictionaries.", e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Exception while trying to fetch dictionaries.", e);
+ } finally {
+ if (jgDictionaries != null) {
+ try {
+ jgDictionaries.close();
+ } catch (IOException e) {
+ logger.error("Exception while trying to close dictionaries writer.", e);
+ }
+ }
+ }
+ }
+
+ private void fetchConnections(GraphManager gm, final EntityCollectionManager ecm, Results entity,
+ File collectionDir, String collectionName, Integer batchId, Gson gson) {
+
+ try(BufferedWriter bufferedWriter = new BufferedWriter(
+ new FileWriter(new File(collectionDir + "/" + collectionName + "_" + "connections_" + batchId)));){
+
+ for (Entity et : entity.getEntities()) {
+
+ List<ConnectionPojo> connections = new ArrayList<>();
+
+ SimpleId id = new SimpleId();
+ id.setType(et.getType());
+ id.setUuid(et.getUuid());
+
+ gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(id))
+ .flatMap(emittedEdgeType -> {
+ logger.debug("loading edges of type {} from node {}", emittedEdgeType, id);
+ return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(id, emittedEdgeType,
+ Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
+ }).map(markedEdge -> {
+
+ if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted()
+ && markedEdge.getTargetNode() != null) {
+
+ // doing the load to just again make sure bad
+ // connections are not exported
+ org.apache.usergrid.persistence.model.entity.Entity en = ecm
+ .load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
+
+ if (en != null) {
+
+ try {
+
+ ConnectionPojo connectionPojo = new ConnectionPojo();
+ connectionPojo.setRelationship(CpNamingUtils
+ .getConnectionNameFromEdgeName(markedEdge.getType()));
+ connectionPojo.setSourceNodeUUID(markedEdge.getSourceNode().getUuid().toString());
+ connectionPojo.setTargetNodeUUID(markedEdge.getTargetNode().getUuid().toString());
+
+ connections.add(connectionPojo);
+
+ } catch (Exception e) {
+ logger.error("Exception while trying process connection entity", e);
+ }
+ } else {
+ logger.warn(
+ "Exported connection has a missing target node, not creating connection in export. Edge: {}",
+ markedEdge);
+ }
+ }
+ return null;
+
+ }).toBlocking().lastOrDefault(null);
+
+ for(ConnectionPojo c : connections) {
+ bufferedWriter.write(gson.toJson(c));
+ bufferedWriter.newLine();
+ }
+ }
+ }catch (Exception e) {
+ logger.error("Exception while trying to write connection to file.", e);
+ }
+ logger.info("Finished fetching details for collection {} batch {}", collectionName, batchId);
+ }
+
+
+ class ConnectionPojo {
+ private String sourceNodeUUID;
+ private String relationship;
+ private String targetNodeUUID;
+ public String getSourceNodeUUID() {
+ return sourceNodeUUID;
+ }
+ public void setSourceNodeUUID(String sourceNodeUUID) {
+ this.sourceNodeUUID = sourceNodeUUID;
+ }
+ public String getRelationship() {
+ return relationship;
+ }
+ public void setRelationship(String relationship) {
+ this.relationship = relationship;
+ }
+ public String getTargetNodeUUID() {
+ return targetNodeUUID;
+ }
+ public void setTargetNodeUUID(String targetNodeUUID) {
+ this.targetNodeUUID = targetNodeUUID;
+ }
+
+ }
+
+ private void writeEntities(File collectionDir, Results entities, Integer batchId, String collectionName, Gson gson) {
+ logger.info("Started writing entities for collection {} batch {} ", collectionName, batchId);
+
+ try(BufferedWriter bufferedWriter = new BufferedWriter(
+ new FileWriter(new File(collectionDir + "/" + collectionName + "_data_" + batchId + ".json")));) {
+
+ logger.info("Got count {} entities for file writing", entities.getEntities().size());
+ for(Entity e : entities.getEntities()) {
+ bufferedWriter.write(gson.toJson(e));
+ bufferedWriter.newLine();
+ }
+
+ } catch (Exception e) {
+ logger.error("Exception while trying to write entities collection {} batch {}", collectionName, batchId, e);
+ }
+
+ logger.info("Finised writing entities for collection {} batch {} ", collectionName, batchId);
+ }
+
+ private void writeEntityIdsBatch(File collectionDir, List<EdgeScope> edgeScopes, Integer batchId,
+ String collectionName) throws Exception {
+ logger.info("Started writing ids for collection {} batch {} ", collectionName, batchId);
+ try (BufferedWriter bufferedWriter = new BufferedWriter(
+ new FileWriter(new File(collectionDir + "/" + collectionName + "_" + batchId)));) {
+ for (EdgeScope es : edgeScopes) {
+ bufferedWriter.write(es.getEdge().toString());
+ bufferedWriter.newLine();
+ }
+ } catch (Exception e) {
+ logger.error("Exception while tryign to write entity ids for collection {} batch {}", collectionName, batchId, e);
+ }
+ logger.info("Finished writing ids for collection {} batch {} ", collectionName, batchId);
+ }
+
}
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c46f4b7..53e5ef3 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -293,8 +293,8 @@
try {
ExportConnection connection = new ExportConnection(
- applicationName,
organizationName,
+ applicationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 6fb0911..5865d15 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -26,11 +26,15 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.services.assets.BinaryStoreFactory.Provider;
import org.apache.usergrid.utils.ConversionUtils;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -39,7 +43,6 @@
/**
* Base class for ToolBase implementations that write output to an output directory and file.
*
- * @author zznate
*/
public abstract class ExportingToolBase extends ToolBase {
@@ -49,9 +52,31 @@
/** Output dir option: -outputDir */
protected static final String OUTPUT_DIR = "outputDir";
+ protected static final String ORG_ID = "orgid";
+ protected static final String ORG_NAME = "orgName";
+ protected static final String APP_ID = "appId";
+ protected static final String APP_NAME = "appName";
+ protected static final String COLL_NAMES = "collNames";
+ protected static final String APPEND_TIMESTAMP = "appendTimestamp";
+ protected static final String SKIP_CONN = "skipConnections";
+ protected static final String SKIP_DICT = "skipDictionaries";
+ protected static final String LAST_EDGE = "lastEdge";
+ protected static final String SKIP_ASSETS = "skipAssets";
+ protected static final String FIELD_TYPE = "fieldType";
+ protected static final String COLLECTION_NAME = "collectionName";
protected String baseOutputDirName = "export";
protected UUID orgId;
+ protected String orgName;
+ protected UUID applicationId;
+ protected String applicationName;
+ protected String[] collNames;
+ protected String fieldType;
+ protected boolean skipConnections = false;
+ protected boolean skipDictionaries = false;
+ protected boolean skipAssets = false;
+ protected String lastEdgeJson = null;
+
JsonFactory jsonFactory = new JsonFactory();
protected long startTime = System.currentTimeMillis();
@@ -62,11 +87,41 @@
Options options = super.createOptions();
- Option outputDir = OptionBuilder.hasArg().withDescription( "output file name -outputDir" ).create( OUTPUT_DIR );
- Option orgId = OptionBuilder.hasArg().withDescription( "Use a specific organization -orgId" ).create( "orgId" );
-
+ Option outputDir = OptionBuilder.hasArg().withDescription( "Output file name -outputDir" ).create( OUTPUT_DIR );
+ Option orgId = OptionBuilder.hasArg().withDescription( "Use a specific organization -orgId" ).create( ORG_ID );
+ Option appId = OptionBuilder.hasArg().withDescription( "Use a specific application -appId (Needs -orgId or -orgName)" ).create( APP_ID );
+ Option orgName = OptionBuilder.hasArg().withDescription( "Use a specific organization name -orgName" ).create( ORG_NAME );
+ Option appName = OptionBuilder.hasArg().withDescription( "Use a specific application name -appName (Needs -orgId or -orgName)" ).create( APP_NAME );
+ Option collNames = OptionBuilder.hasArg().withDescription( "Export list of comma separated collections -collNames (Needs -orgId or -orgName and -appId or -appName)" ).create( COLL_NAMES );
+ Option appendTimestamp = OptionBuilder.withDescription( "Attach timestamp to output directory -appendTimestamp" ).create( APPEND_TIMESTAMP );
+ Option skipConns = OptionBuilder.withDescription( "Skip exporting connections for entities -skipConnections" ).create( SKIP_CONN );
+ Option skipDicts = OptionBuilder.withDescription( "Skip exporting dictionaries for entities -skipDictionaries" ).create( SKIP_DICT );
+ Option lastEdge = OptionBuilder.hasArg().withDescription( "Last Edge from previous run to resume export -lastEdge" ).create( LAST_EDGE );
+ Option skipAssets = OptionBuilder.withDescription( "Skip exporting assets for entities -skipAssets" ).create( SKIP_ASSETS );
+ Option awsKey = OptionBuilder.hasArg().withDescription( "AWS access key -awsKey" ).create( AWS_KEY );
+ Option storeType = OptionBuilder.hasArg().withDescription( "Binary store type -storeType (aws, google, local)" ).create( STORE_TYPE );
+ Option awsId = OptionBuilder.hasArg().withDescription( "AWS access id -awsId" ).create( AWS_ID );
+ Option bucketName = OptionBuilder.hasArg().withDescription( "Binary storage bucket name -bucketName" ).create( BINARY_BUCKET_NAME );
+ Option fieldType = OptionBuilder.hasArg().withDescription( "Field type for unique value check -fieldType" ).create( FIELD_TYPE );
+ Option collectionName = OptionBuilder.hasArg().withDescription( "Collection name for unique value check -collectionName" ).create( COLLECTION_NAME );
+
options.addOption( outputDir );
options.addOption( orgId );
+ options.addOption( appId );
+ options.addOption( collNames );
+ options.addOption( appName );
+ options.addOption( orgName );
+ options.addOption( appendTimestamp );
+ options.addOption( skipConns );
+ options.addOption( skipDicts );
+ options.addOption( lastEdge );
+ options.addOption( skipAssets );
+ options.addOption( awsKey );
+ options.addOption( awsId );
+ options.addOption( bucketName );
+ options.addOption( storeType );
+ options.addOption(fieldType);
+ options.addOption(collectionName);
return options;
}
@@ -75,10 +130,15 @@
protected void prepareBaseOutputFileName( CommandLine line ) {
boolean hasOutputDir = line.hasOption( OUTPUT_DIR );
+ boolean appendTimestamp = line.hasOption( APPEND_TIMESTAMP );
if ( hasOutputDir ) {
baseOutputDirName = line.getOptionValue( OUTPUT_DIR );
}
+
+ if(appendTimestamp) {
+ baseOutputDirName = baseOutputDirName + "_"+startTime;
+ }
}
@@ -87,6 +147,71 @@
orgId = ConversionUtils.uuid( line.getOptionValue( "orgId" ) );
}
}
+
+
+ protected void applyExportParams( CommandLine line ) {
+
+ if ( line.hasOption( ORG_ID ) ) {
+ orgId = ConversionUtils.uuid( line.getOptionValue( ORG_ID ) );
+ }
+ else if ( line.hasOption( ORG_NAME ) ) {
+ orgName = line.getOptionValue( ORG_NAME ) ;
+ }
+
+ if ( line.hasOption( APP_ID ) ) {
+ applicationId = ConversionUtils.uuid( line.getOptionValue( APP_ID ) );
+ }
+ else if ( line.hasOption( APP_NAME ) ) {
+ applicationName = line.getOptionValue( APP_NAME ) ;
+ }
+ if ( line.hasOption( COLL_NAMES ) ) {
+ collNames = line.getOptionValue( COLL_NAMES ).split(",");
+ }
+ if(line.hasOption( COLLECTION_NAME )) {
+ collNames = new String[] {line.getOptionValue( COLLECTION_NAME )};
+ }
+ skipConnections = line.hasOption( SKIP_CONN );
+ skipDictionaries = line.hasOption( SKIP_DICT );
+
+ if(line.hasOption(LAST_EDGE)) {
+ lastEdgeJson = line.getOptionValue(LAST_EDGE);
+ }
+
+ skipAssets = line.hasOption( SKIP_ASSETS );
+
+ if(line.hasOption( FIELD_TYPE )) {
+ fieldType = line.getOptionValue( FIELD_TYPE );
+ }
+ }
+
+ protected void validateOptions(CommandLine line) throws MissingOptionException {
+ if ((line.hasOption(APP_ID) || line.hasOption(APP_NAME))
+ && !(line.hasOption(ORG_ID) || line.hasOption(ORG_NAME))) {
+ throw new MissingOptionException("-orgId or -orgName is required if you pass -appId or -appName");
+ }
+ if (line.hasOption(COLL_NAMES) && !(line.hasOption(APP_ID) || line.hasOption(APP_NAME))) {
+ throw new MissingOptionException(
+ "[-appId or -appName] and [-orgId or -orgName] are required if you pass -collNames");
+ }
+
+ if (!line.hasOption(SKIP_ASSETS)) {
+ if (line.hasOption(STORE_TYPE)) {
+ String storeType = line.getOptionValue(STORE_TYPE);
+ if (storeType.equals(Provider.aws.toString())) {
+ if (!line.hasOption(AWS_ID) || !line.hasOption(AWS_KEY) || !line.hasOption(BINARY_BUCKET_NAME)) {
+ throw new MissingOptionException(
+ "[-awsId and -awsKey and -bucketName] are required if you pass -storeType as aws");
+ }
+ } else if (storeType.equals(Provider.google.toString())) {
+ if (!line.hasOption(BINARY_BUCKET_NAME)) {
+ throw new MissingOptionException("[-bucketName] is required if you pass -storeType as google");
+ }
+ }
+ } else {
+ throw new MissingOptionException("[-storeType] is required if you do not pass -skipAssets");
+ }
+ }
+ }
/**
@@ -116,10 +241,24 @@
protected File createOutputFile( File parent, String type, String name ) {
return new File( parent, prepareOutputFileName( type, name ) );
}
+
+ protected File createOrgDir( String orgName ) {
+ return createDir( outputDir.getAbsolutePath(), orgName);
+ }
+
+ protected File createApplicationDir( File orgDir, String applicationName ) {
+ return createDir( orgDir.getAbsolutePath(), applicationName);
+ }
- protected File createCollectionsDir( String applicationName ) {
- return createDir( String.format( "%s/%s.applicationName.collections", outputDir, applicationName ) );
+ protected File createCollectionDir( File appDir, String collectionName ) {
+ return createDir( appDir.getAbsolutePath(), collectionName) ;
+ }
+
+
+ protected File createDir( String baseDir, String dirName ) {
+
+ return createDir((baseDir!=null && !baseDir.trim().equals(""))?(baseDir + File.separator +dirName): dirName);
}
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
index 62636ea..e2711bb 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ToolBase.java
@@ -71,9 +71,11 @@
protected CassandraService cass;
protected Injector injector;
-
-
-
+
+ protected static final String AWS_KEY = "awsKey";
+ protected static final String AWS_ID = "awsId";
+ protected static final String BINARY_BUCKET_NAME = "bucketName";
+ protected static final String STORE_TYPE = "storeType";
public void startTool( String[] args ) {
startTool( args, true );
@@ -93,6 +95,15 @@
if ( line == null ) {
return;
}
+
+
+ try {
+ validateOptions(line);
+ }
+ catch ( MissingOptionException exp ) {
+ printCliHelp( "Required or dependent options are missing. Reason: " + exp.getMessage() );
+ }
+
// notification queue listener not needed for tools
System.setProperty("usergrid.notifications.listener.run", "false");
@@ -105,11 +116,44 @@
if ( line.hasOption( "host" ) ) {
System.setProperty( "cassandra.url", line.getOptionValue( "host" ) );
+ }
+
+ if ( line.hasOption( "eshost" ) ) {
System.setProperty( "elasticsearch.hosts", line.getOptionValue( "eshost" ) );
+ }
+
+ if ( line.hasOption( "escluster" ) ) {
System.setProperty( "elasticsearch.cluster_name", line.getOptionValue( "escluster" ) );
+ }
+
+ if ( line.hasOption( "ugcluster" ) ) {
System.setProperty( "usergrid.cluster_name", line.getOptionValue( "ugcluster" ) );
}
+ if ( line.hasOption( "appkeyspace" ) ) {
+ System.setProperty( "cassandra.keyspace.application", line.getOptionValue( "appkeyspace" ) );
+ }
+
+ if ( line.hasOption( "lockskeyspace" ) ) {
+ System.setProperty( "cassandra.lock.keyspace", line.getOptionValue( "lockskeyspace" ) );
+ }
+
+ if(line.hasOption(AWS_ID)) {
+ System.setProperty("AWS_ACCESS_KEY_ID", line.getOptionValue( AWS_ID ));
+ }
+
+ if(line.hasOption(AWS_KEY)) {
+ System.setProperty("AWS_SECRET_KEY", line.getOptionValue( AWS_KEY ));
+ }
+
+ if(line.hasOption(BINARY_BUCKET_NAME)) {
+ System.setProperty("usergrid.binary.bucketname", line.getOptionValue( BINARY_BUCKET_NAME ));
+ }
+
+ if(line.hasOption( STORE_TYPE )) {
+ System.setProperty("usergrid.binary.uploader", line.getOptionValue( STORE_TYPE ));
+ }
+
try {
runTool( line );
}
@@ -153,6 +197,12 @@
Option remoteOption = OptionBuilder
.withDescription( "Use remote Cassandra instance" ).create( "remote" );
+ Option ugAppKeyspace = OptionBuilder.withArgName( "appkeyspace" ).hasArg()
+ .withDescription( "Usergrid Application keyspace" ).create( "appkeyspace" );
+
+ Option ugLocksKeyspace = OptionBuilder.withArgName( "lockskeyspace" ).hasArg()
+ .withDescription( "Usergrid Locks keyspace" ).create( "lockskeyspace" );
+
Option verbose = OptionBuilder
.withDescription( "Print on the console an echo of the content written to the file" )
.create( VERBOSE );
@@ -163,10 +213,17 @@
options.addOption( esClusterOption );
options.addOption( ugClusterOption );
options.addOption( remoteOption );
+ options.addOption( ugAppKeyspace );
+ options.addOption( ugLocksKeyspace );
options.addOption( verbose );
return options;
}
+
+
+ protected void validateOptions(CommandLine line) throws MissingOptionException {
+
+ }
public void startEmbedded() throws Exception {
@@ -278,7 +335,10 @@
" cassandra.connections: {}\n" +
" usergrid.notifications.listener.run: {}\n" +
" usergrid.push.worker_count: {}\n" +
- " usergrid.scheduler.enabled: {}\n",
+ " usergrid.scheduler.enabled: {}\n" +
+ " cassandra.readcl: {}\n" +
+ " usergrid.read.cl: {}\n" +
+ " usergrid.binary.uploader: {}\n",
properties.get("cassandra.url"),
properties.get("cassandra.datacenter.local"),
properties.get("cassandra.username"),
@@ -290,7 +350,11 @@
properties.get("cassandra.connections"),
properties.get("usergrid.notifications.listener.run"),
properties.get("usergrid.push.worker_count"),
- properties.get("usergrid.scheduler.enabled")
+ properties.get("usergrid.scheduler.enabled"),
+ properties.get("cassandra.readcl"),
+ properties.get("usergrid.read.cl"),
+ properties.get("usergrid.binary.uploader")
+
);
}
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+ static final Logger logger = LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+ JsonFactory jsonFactory = new JsonFactory();
+ public static final String LAST_ID = "lastId";
+
+ public static final String FIND_MISSING_UNIQUE_VALUES = "findMissingUniqueValues";
+ public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+ private boolean findMissingUniqueValues = false;
+ private boolean fixMissingValue = false;
+
+ private AllEntityIdsObservable allEntityIdsObs;
+ private SimpleEdge lastEdge = null;
+
+ private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
+ private ExecutorService uniqueValueChecker = Executors.newFixedThreadPool(50);
+
+ private Session session;
+ private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option findMissingUniqueValues = OptionBuilder
+ .withDescription("Find entities with missing unique value entry -findMissingUniqueValues")
+ .create(FIND_MISSING_UNIQUE_VALUES);
+ Option fixMissingUniqueValueEntries = OptionBuilder
+ .withDescription("Fix entities with missing unique value entry -fixUniqueValues")
+ .create(FIX_MISSING_VALUES);
+
+ options.addOption(findMissingUniqueValues);
+ options.addOption(fixMissingUniqueValueEntries);
+
+ return options;
+ }
+
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ startSpring();
+ setVerbose(line);
+
+ this.allEntityIdsObs = injector.getInstance(AllEntityIdsObservable.class);
+ applyInputParams(line);
+
+ mvccEntitySerializationStrategy = injector.getInstance(MvccEntitySerializationStrategy.class);
+ uniqueValueSerializationStrategy = injector.getInstance(UniqueValueSerializationStrategy.class);
+ session = injector.getInstance(Session.class);
+
+ startEntityScan();
+
+ logger.info("Finished checking entities. Waiting for threads to complete execution.");
+
+ while (true) {
+ try {
+ // Spinning to prevent program execution from ending.
+ // Need to replace with some kind of countdown latch or task tracker
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ logger.error("Exception while waiting for unique check to complete.", e);
+ }
+ }
+ }
+
+ private void startEntityScan() throws Exception, UnsupportedEncodingException {
+
+ for (Entry<UUID, String> organizationName : getOrgs().entrySet()) {
+
+ // Let's skip the test entities.
+ if (organizationName.equals(properties.getProperty("usergrid.test-account.organization"))) {
+ continue;
+ }
+ fetchApplicationsForOrgs(organizationName.getKey(), organizationName.getValue());
+ }
+ }
+
+ private Map<UUID, String> getOrgs() throws Exception {
+ // Loop through the organizations
+ Map<UUID, String> organizationNames = null;
+
+ if (orgId == null && (orgName == null || orgName.trim().equals(""))) {
+ organizationNames = managementService.getOrganizations();
+ } else {
+ OrganizationInfo info = null;
+
+ if (orgId != null) {
+ info = managementService.getOrganizationByUuid(orgId);
+ } else {
+ info = managementService.getOrganizationByName(orgName);
+ }
+
+ if (info == null) {
+ logger.error("Organization info is null!");
+ System.exit(1);
+ }
+
+ organizationNames = new HashMap<UUID, String>();
+ organizationNames.put(info.getUuid(), info.getName());
+ }
+
+ return organizationNames;
+ }
+
+ private void fetchApplicationsForOrgs(UUID orgId, String orgName) throws Exception {
+
+ logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+ // Loop through the applications per organization
+ BiMap<UUID, String> applications = managementService.getApplicationsForOrganization(orgId);
+
+ if (applicationId == null && (applicationName == null || applicationName.trim().equals(""))) {
+ // export all apps as appId or name is not provided
+
+ Observable.from(applications.entrySet()).subscribe(appEntry -> {
+ UUID appId = appEntry.getKey();
+ String appName = appEntry.getValue().split("/")[1];
+ try {
+ fetchApplications(appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+ }
+ });
+
+ } else {
+
+ UUID appId = applicationId;
+ String appName = applicationName;
+
+ if (applicationId != null) {
+ appName = applications.get(appId);
+ } else {
+ appId = applications.inverse().get(orgName + '/' + appName);
+ }
+
+ try {
+ fetchApplications(appId, appName);
+ } catch (Exception e) {
+ logger.error("There was an exception fetching application {} : {}", appName, appId, e);
+ }
+
+ }
+ }
+
+ private void fetchApplications(UUID appId, String appName) throws Exception {
+
+ logger.info("Fetching application for {} : {} ", appName, appId);
+
+ EntityManager em = emf.getEntityManager(appId);
+
+ Set<String> collections = em.getApplicationCollections();
+
+ if (collNames == null || collNames.length <= 0) {
+ logger.info("Please pass collection name ( -collectionName testCollection ) ");
+ } else {
+ Observable.from(collNames).subscribe(collectionName -> {
+ if (collections.contains(collectionName)) {
+ fetchCollections(appId, collectionName, em);
+ }
+ });
+ }
+
+ }
+
+ private void fetchCollections(UUID appId, String collectionName, EntityManager em) {
+ extractEntitiesForCollection(appId, collectionName);
+ }
+
+ private void extractEntitiesForCollection(UUID applicationId, String collectionName) {
+
+ AtomicInteger batch = new AtomicInteger(1);
+
+ final EntityManager rootEm = emf.getEntityManager(applicationId);
+
+ ExecutorService edgeScopeFetcher = Executors.newFixedThreadPool(1);
+ allEntityIdsObs
+ .getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+ Optional.fromNullable(
+ CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+ (lastEdge == null ? Optional.absent() : Optional.fromNullable(lastEdge)))
+ .buffer(1000).finallyDo(() -> {
+ edgeScopeFetcher.shutdown();
+ logger.info("Finished fetching entity ids for {}. Shutting down entity edge scope fetcher ",
+ collectionName);
+ while (!edgeScopeFetcher.isTerminated()) {
+ try {
+ edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+ logger.info("Entity edge scope fetcher terminated after shutdown for {}", collectionName);
+ }).subscribe(edges -> {
+
+ logger.info("For collection {}", collectionName);
+ Integer batchId = batch.getAndIncrement();
+ logger.info("Started fetching details for collection {} batch {} ", collectionName, batchId);
+ Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes -> {
+
+ List<UUID> entityIds = new ArrayList<UUID>(1000);
+
+ for (EdgeScope edgeScope : edgeScopes) {
+ Id entityId = edgeScope.getEdge().getTargetNode();
+ if (entityId != null) {
+ entityIds.add(entityId.getUuid());
+ } else {
+ edgeScopes.remove(edgeScope);
+ }
+ }
+ try {
+ String type = edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+ Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // change to
+ .subscribe(entIds -> {
+
+ logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+ type, batchId);
+ Results entities = rootEm.getEntities(entIds, type);
+ logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+ type, batchId);
+ try {
+
+ ConnectableObservable<Entity> entityObs = Observable
+ .from(entities.getEntities()).publish();
+ entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+ entityObs.subscribe(t -> {
+ logger.info("Fetched entity with UUID : {}", t.getUuid());
+ if (findMissingUniqueValues) {
+ String fieldValue = null;
+ //We can search entity with UUID or name/email based on the entity type.
+ //This mapping between unique value field(name/email etc) and UUID,
+ //is stored in unique value table. This can either be name / email or any other type.
+ //This value is being passed as field type.
+ //The code below takes the parameter and retrieves the value of the field using the getter method.
+ if (fieldType == null || fieldType.equals("")
+ || fieldType.equals("name")) {
+ fieldType = "name";
+ fieldValue = t.getName();
+ } else {
+ try {
+ Method method = t.getClass()
+ .getMethod("get"
+ + fieldType.substring(0, 1).toUpperCase()
+ + fieldType.substring(1));
+ fieldValue = (String) method.invoke(t);
+ } catch (Exception e1) {
+ logger.error(
+ "Exception while trying to fetch field value of type {} for entity {} batch {}",
+ fieldType, t.getUuid(), batchId, e1);
+ }
+ }
+ try {
+ if (fieldValue != null) {
+
+ Entity e = rootEm.getUniqueEntityFromAlias(t.getType(),
+ fieldValue, false);
+
+ if (e == null) {
+ logger.info(
+ "No entity found for field type {} and field value {} but exists for UUID {}",
+ fieldType, fieldValue, t.getUuid());
+ if (fixMissingValue) {
+ logger.info(
+ "Trying to repair unique value mapping for {} ",
+ t.getUuid());
+ UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy
+ .load(new ApplicationScopeImpl(new SimpleId(
+ applicationId, "application")),
+ ConsistencyLevel
+ .valueOf(System.getProperty(
+ "usergrid.read.cl",
+ "LOCAL_QUORUM")),
+ t.getType(),
+ Collections.singletonList(
+ new StringField(fieldType,
+ fieldValue)),
+ false);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(
+ new SimpleId(applicationId, "application"));
+ com.google.common.base.Optional<MvccEntity> entity = mvccEntitySerializationStrategy
+ .load(applicationScope, new SimpleId(
+ t.getUuid(), t.getType()));
+
+ if (!entity.isPresent()
+ || !entity.get().getEntity().isPresent()) {
+ throw new RuntimeException(
+ "Unable to update unique value index because supplied UUID "
+ + t.getUuid()
+ + " does not exist");
+ }
+ logger.info("Delete unique value: {}",
+ uniqueValueSet.getValue(fieldType));
+ try {
+ session.execute(uniqueValueSerializationStrategy
+ .deleteCQL(applicationScope,
+ uniqueValueSet
+ .getValue(fieldType)));
+ } catch (Exception ex) {
+ logger.error(
+ "Exception while trying to delete the Unique value for {}. Will proceed with creating new entry",
+ t.getUuid(), ex);
+ }
+
+ UniqueValue newUniqueValue = new UniqueValueImpl(
+ new StringField(fieldType, fieldValue),
+ entity.get().getId(),
+ entity.get().getVersion());
+ logger.info("Writing new unique value: {}",
+ newUniqueValue);
+ session.execute(uniqueValueSerializationStrategy
+ .writeCQL(applicationScope, newUniqueValue,
+ -1));
+ }
+
+ } else {
+ logger.info(
+ "Found entity {} for field type {} and field value {}",
+ e.getUuid(), fieldType, fieldValue);
+ }
+ } else {
+ logger.info("No value found for field {} for entity {}",
+ fieldType, t.getUuid());
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Error while checking unique values for batch id : {} for entity {}",
+ batchId, t.getUuid(), e);
+ }
+ }
+ });
+ entityObs.connect();
+
+ } catch (Exception e) {
+ logger.error(
+ "Error while checking unique values for batch id : {} for collection {}",
+ batchId, collectionName, e);
+ }
+ });
+
+ } catch (Exception e) {
+ logger.error("Exception while traversing entities " + edgeScopes.get(0).getEdge(), e);
+ System.exit(0);
+ }
+ });
+ logger.info("Finished entity walk for collection {} for batch {}", collectionName, batchId);
+ });
+ logger.info("Exiting extractEntitiesForCollection() method.");
+ }
+
+ protected void applyInputParams(CommandLine line) {
+
+ if (line.hasOption(ORG_ID)) {
+ orgId = ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+ } else if (line.hasOption(ORG_NAME)) {
+ orgName = line.getOptionValue(ORG_NAME);
+ }
+
+ if (line.hasOption(APP_ID)) {
+ applicationId = ConversionUtils.uuid(line.getOptionValue(APP_ID));
+ } else if (line.hasOption(APP_NAME)) {
+ applicationName = line.getOptionValue(APP_NAME);
+ }
+ if (line.hasOption(COLL_NAMES)) {
+ collNames = line.getOptionValue(COLL_NAMES).split(",");
+ }
+ if (line.hasOption(COLLECTION_NAME)) {
+ collNames = new String[] { line.getOptionValue(COLLECTION_NAME) };
+ }
+ findMissingUniqueValues = line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+ fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+ }
+}