blob: 049cd4ed07fc988323c2f1447e830e955c34f7c8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.usergrid.corepersistence.asyncevents;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.impl.IndexEdgeImpl;
import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import rx.Observable;
* Service that executes event flows
public class EventBuilderImpl implements EventBuilder {
private static final Logger logger = LoggerFactory.getLogger( EventBuilderImpl.class );
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final GraphManagerFactory graphManagerFactory;
private final SerializationFig serializationFig;
private final CollectionVersionManagerFactory collectionVersionManagerFactory;
public EventBuilderImpl( final IndexService indexService,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig,
final CollectionVersionManagerFactory collectionVersionManagerFactory) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.graphManagerFactory = graphManagerFactory;
this.serializationFig = serializationFig;
this.collectionVersionManagerFactory = collectionVersionManagerFactory;
public Id getCollectionVersionedId(ApplicationScope applicationScope, Id id, boolean forceVersion) {
String currentCollectionName = InflectionUtils.pluralize(id.getType());
// if already versioned, or not a custom (versionable) collection, we're done
if (!CollectionUtils.isCustomCollectionOrEntityName(currentCollectionName) ||
CollectionVersionUtils.isVersionedName(currentCollectionName)) {
return id;
CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance(
new CollectionScopeImpl(applicationScope.getApplication(), currentCollectionName)
String currentCollectionVersion = cvm.getCollectionVersion(true);
String newEntityType = CollectionVersionUtils.buildVersionedNameString(id.getType(), currentCollectionVersion,
false, forceVersion);
return new SimpleId(id.getUuid(), newEntityType);
public Entity getCollectionVersionedEntity(final ApplicationScope applicationScope, final Entity entity, boolean forceVersion) {
return new Entity(getCollectionVersionedId(applicationScope, entity.getId(), forceVersion), entity.getVersion() );
public Edge getCollectionVersionedEdge(final ApplicationScope applicationScope, final Edge edge, boolean forceVersion) {
Edge returnEdge;
if (edge instanceof SimpleMarkedEdge) {
MarkedEdge markedEdge = (MarkedEdge)edge;
returnEdge = new SimpleMarkedEdge(
getCollectionVersionedId(applicationScope, markedEdge.getSourceNode(), forceVersion),
getCollectionVersionedId(applicationScope, markedEdge.getTargetNode(), forceVersion),
} else { // SimpleEdge
returnEdge = new SimpleEdge(getCollectionVersionedId(applicationScope, edge.getSourceNode(), forceVersion), edge.getType(),
getCollectionVersionedId(applicationScope, edge.getTargetNode(), forceVersion), edge.getTimestamp());
return returnEdge;
public SearchEdge getCollectionVersionedSearchEdge(final ApplicationScope applicationScope, final SearchEdge searchEdge, boolean forceVersion) {
SearchEdge returnSearchEdge;
if (searchEdge instanceof IndexEdgeImpl) {
IndexEdge indexEdge = (IndexEdge)searchEdge;
returnSearchEdge = new IndexEdgeImpl(
getCollectionVersionedId(applicationScope, indexEdge.getNodeId(), forceVersion),
} else { // SearchEdgeImpl
returnSearchEdge = new SearchEdgeImpl(getCollectionVersionedId(applicationScope, searchEdge.getNodeId(), forceVersion),
searchEdge.getEdgeName(), searchEdge.getNodeType());
return returnSearchEdge;
public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
final Edge newEdge ) {
if (logger.isDebugEnabled()) {
logger.debug("Indexing in app scope {} with entity {} and new edge {}",
applicationScope, entity, newEdge);
return indexService.indexEdge( applicationScope, entity, newEdge );
public IndexOperationMessage buildDeleteEdge( final ApplicationScope applicationScope, final Edge
edge ) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
final IndexOperationMessage combined = new IndexOperationMessage();
gm.deleteEdge( edge )
.doOnNext( deletedEdge -> {
logger.debug("Processing deleted edge for de-indexing {}", deletedEdge);
// get ALL versions of the target node as any connection from this source node needs to be removed
ecm.getVersionsFromMaxToMin(deletedEdge.getTargetNode(), UUIDUtils.newTimeUUID())
.doOnNext(mvccLogEntry -> {
logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", deletedEdge.getTargetNode(), mvccLogEntry);
.deIndexEdge(applicationScope, deletedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
.toBlocking().lastOrDefault(new IndexOperationMessage()));
return combined;
//Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}",
applicationScope, entityId);
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
MvccLogEntry mostRecentToDelete =
ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
.firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
//"mostRecent stage={} entityId={} version={} state={}",
// mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(),
// mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name());
if (mostRecentToDelete == null) {"No entity versions to delete for id {}", entityId.toString());
// if nothing is marked, then abort
if(mostRecentToDelete == null){
return new IndexOperationMessage();
final List<MvccLogEntry> logEntries = new ArrayList<>();
Observable<MvccLogEntry> mvccLogEntryListObservable =
ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
.filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
.buffer( serializationFig.getBufferSize() )
.doOnNext( buffer -> ecm.delete( buffer ) )
.doOnNext(mvccLogEntries -> {
//"logEntries size={}", logEntries.size());
IndexOperationMessage combined = new IndexOperationMessage();
// do the edge deletes and build up de-index messages for each edge deleted
// assume we have "server1" and "region1" nodes in the graph with the following relationships (edges/connections):
// region1 -- zzzconnzzz|has --> server1
// server1 -- zzzconnzzz|in --> region1
// there will always be a relationship from the appId to each entity based on the entity type (collection):
// application -- zzzcollzzz|servers --> server1
// application -- zzzcollzzz|regions --> region1
// When deleting either "server1" or "region1" entity, the connections should get deleted and de-indexed along
// with the entry for the entity itself in the collection. The above example should have at minimum 3 things to
// be de-indexed. There may be more as either "server1" or "region1" could have multiple versions.
// Further comments using the example of deleting "server1" from the above example.
gm.compactNode(entityId).doOnNext(markedEdge -> {
if (logger.isDebugEnabled()) {
logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
// if the edge was for a connection where the entity to be deleted is the source node, we need to load
// the target node's versions so that all versions of connections to that entity can be de-indexed
// server1 -- zzzconnzzz|in --> region1
// get ALL versions of the target node as any connection from this source node needs to be removed
ecm.getVersionsFromMaxToMin( markedEdge.getTargetNode(), UUIDUtils.newTimeUUID() )
.doOnNext(mvccLogEntry -> {
logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, mvccLogEntry);
.deIndexEdge(applicationScope, markedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
.toBlocking().lastOrDefault(new IndexOperationMessage()));
}else {
// for each version of the entity being deleted, de-index the connections where the entity is the target
// node ( application -- zzzcollzzz|servers --> server1 ) or (region1 -- zzzconnzzz|has --> server1)
logEntries.forEach(logEntry -> {
logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, logEntry);
.deIndexEdge(applicationScope, markedEdge, logEntry.getEntityId(), logEntry.getVersion())
.toBlocking().lastOrDefault(new IndexOperationMessage()));
return combined;
public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
final Id entityId = entityIndexOperation.getId();
//load the entity
return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
entity -> {
final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
* We don't have a modified field, so we can't check, pass it through
if ( modified == null ) {
return true;
//entityIndexOperation.getUpdatedSince will always be 0 except for reindexing the application
//only re-index if it has been updated and been updated after our timestamp
return modified.getValue() >= entityIndexOperation.getUpdatedSince();
} )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
public Observable<IndexOperationMessage> deIndexOldVersions( final ApplicationScope applicationScope,
final Id entityId, final UUID markedVersion ){
if (logger.isDebugEnabled()) {
logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
return indexService.deIndexOldVersions( applicationScope, entityId,
getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
final Id entityId, final UUID markedVersion ){
final List<UUID> versions = new ArrayList<>();
// only take last 100 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
// clusters that do not have this in-line cleanup
ecm.getVersionsFromMaxToMin( entityId, markedVersion)
.forEach( mvccLogEntry -> {
if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
return versions;
private List<UUID> getAllVersions( final EntityCollectionManager ecm,
final Id entityId ) {
final List<UUID> versions = new ArrayList<>();
ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID())
.forEach( mvccLogEntry -> {
return versions;