blob: 950962623f7d8297b23855a4a6d5a1b33d8991f8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.usergrid.corepersistence.index;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.CandidateResults;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.JsonUtils;
import org.apache.usergrid.utils.UUIDUtils;
import com.codahale.metrics.Timer;
import rx.Observable;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
* Implementation of the indexing service
public class IndexServiceImpl implements IndexService {
private static final Logger logger = LoggerFactory.getLogger( IndexServiceImpl.class );
private final GraphManagerFactory graphManagerFactory;
private final EntityIndexFactory entityIndexFactory;
private final MapManagerFactory mapManagerFactory;
private final IndexSchemaCacheFactory indexSchemaCacheFactory;
private final EdgesObservable edgesObservable;
private final IndexFig indexFig;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final Timer indexTimer;
private final Timer addTimer;
public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory,
final MapManagerFactory mapManagerFactory,
final IndexSchemaCacheFactory indexSchemaCacheFactory,
final EdgesObservable edgesObservable, final IndexFig indexFig,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final MetricsFactory metricsFactory ) {
this.graphManagerFactory = graphManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.mapManagerFactory = mapManagerFactory;
this.edgesObservable = edgesObservable;
this.indexFig = indexFig;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.indexSchemaCacheFactory = indexSchemaCacheFactory;
this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.update_all");
this.addTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.add" );
public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
final Entity entity ) {
//bootstrap the lower modules from their caches
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
final Id entityId = entity.getId();
//we always index in the target scope
final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
//we may have to index we're indexing from source->target here
final Observable<IndexEdge> sourceEdgesToIndex = edge -> generateScopeFromSource( edge ) );
//do our observable for batching
//try to send a whole batch if we can
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
.buffer(indexFig.getIndexBatchSize() )
//map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer )
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
if (logger.isDebugEnabled()) {
logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
batch.index( indexEdge, entity ,fieldsToIndex);
} )
//return the future from the batch execution
.map( batch -> ) );
return ObservableTimer.time( batches, indexTimer );
public Observable<IndexOperationMessage> indexEdge( final ApplicationScope applicationScope, final Entity entity, final Edge edge ) {
final Observable<IndexOperationMessage> batches = Observable.just( edge ).map( observableEdge -> {
//if the node is the target node, generate our scope correctly
if ( edge.getTargetNode().equals( entity.getId() ) ) {
return generateScopeFromSource( edge );
throw new IllegalArgumentException("target not equal to entity + "+entity.getId());
} ).map( indexEdge -> {
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final EntityIndexBatch batch = ei.createBatch();
if (logger.isDebugEnabled()) {
logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
batch.index( indexEdge, entity ,fieldsToIndex);
} );
return ObservableTimer.time( batches, addTimer );
* This method takes in an entity and flattens it out then begins the process to filter out
* properties that we do not want to index. Once flatted we parse through each property
* and verify that we want it to be indexed on. The set of default properties that will always be indexed are as follows.
* UUID - TYPE - MODIFIED - CREATED. Depending on the schema this may change. For instance, users will always require
* NAME, but the above four will always be taken in.
* @param indexEdge
* @return This returns a filtered map that contains the flatted properties of the entity. If there isn't a schema
* associated with the collection then return null ( and index the entity in its entirety )
private Optional<Set<String>> getFilteredStringObjectMap( final IndexEdge indexEdge ) {
Id mapOwner = new SimpleId( indexEdge.getNodeId().getUuid(), TYPE_APPLICATION );
final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner );
MapManager mm = mapManagerFactory.createMapManager( ms );
Set<String> defaultProperties;
ArrayList fieldsToKeep;
String collectionName = CpNamingUtils.getCollectionNameFromEdgeName( indexEdge.getEdgeName() );
IndexSchemaCache indexSchemaCache = indexSchemaCacheFactory.getInstance( mm );
Optional<Map> collectionIndexingSchema = indexSchemaCache.getCollectionSchema( collectionName );
//If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return.
if ( collectionIndexingSchema.isPresent()) {
Map jsonMapData = collectionIndexingSchema.get();
Schema schema = Schema.getDefaultSchema();
defaultProperties = schema.getRequiredProperties( collectionName );
fieldsToKeep = ( ArrayList ) jsonMapData.get( "fields" );
if(fieldsToKeep.contains( "*" )){
return Optional.absent();
// never add "none" because it has special meaning, "none" disables indexing for a type
defaultProperties.addAll( fieldsToKeep );
else {
return Optional.absent();
return Optional.of(defaultProperties);
//Steps to delete an IndexEdge.
//1.Take the search edge given and search for all the edges in elasticsearch matching that search edge
//2. Batch Delete all of those edges returned in the previous search.
//TODO: optimize loops further.
public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
final Edge edge ) {
final Observable<IndexOperationMessage> batches =
Observable.just( edge ).map( edgeValue -> {
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
EntityIndexBatch batch = ei.createBatch();
//review why generating the Scope from the Source and the target node makes sense.
final IndexEdge fromSource = generateScopeFromSource( edge );
final Id targetId = edge.getTargetNode();
CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId );
//1. Feed the observable the candidate results you got back. Since it now does the aggregation for you
// you don't need to worry about putting your code in a do while.
batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
final IndexEdge fromTarget = generateScopeFromTarget( edge );
final Id sourceId = edge.getSourceNode();
CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId );
batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
} );
return ObservableTimer.time( batches, addTimer );
//This should look up the entityId and delete any documents with a timestamp that comes before
//The edges that are connected will be compacted away from the graph.
public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
final Id entityId, final UUID markedVersion ) {
//bootstrap the lower modules from their caches
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion );
//If we get no search results, its possible that something was already deleted or
//that it wasn't indexed yet. In either case we can't delete anything and return an empty observable..
if(crs.isEmpty()) {
return Observable.empty();
UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID();
//not actually sure about the timestamp but ah well. works.
SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
timeUUID.timestamp() ) );
final Observable<IndexOperationMessage> batches = Observable.from( crs )
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
if (logger.isDebugEnabled()) {
logger.debug("Deindexing on edge {} for entity {} added to batch", searchEdge, entityId);
batch.deindex( candidateResult );
} )
//return the future from the batch execution
.map( batch -> );
return ObservableTimer.time(batches, indexTimer);
* Takes in candidate results and uses the iterator to create batch commands
public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){
Iterator itr = edgesToBeDeindexed.iterator();
while( itr.hasNext() ) {
batch.deindex( edge, ( CandidateResult );
return batch;