blob: 32470f63103a6d88f8a59f795e9834cff0516545 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.corepersistence.index;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.utils.InflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import java.util.*;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
/**
* Implementation of the indexing service
*/
@Singleton
public class IndexServiceImpl implements IndexService {
private static final Logger logger = LoggerFactory.getLogger( IndexServiceImpl.class );
private final GraphManagerFactory graphManagerFactory;
private final EntityIndexFactory entityIndexFactory;
private final MapManagerFactory mapManagerFactory;
private final CollectionSettingsFactory collectionSettingsFactory;
private final EdgesObservable edgesObservable;
private final IndexFig indexFig;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final Timer indexTimer;
private final Timer addTimer;
@Inject
public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory,
final MapManagerFactory mapManagerFactory,
final CollectionSettingsFactory collectionSettingsFactory,
final EdgesObservable edgesObservable, final IndexFig indexFig,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final MetricsFactory metricsFactory ) {
this.graphManagerFactory = graphManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.mapManagerFactory = mapManagerFactory;
this.edgesObservable = edgesObservable;
this.indexFig = indexFig;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.collectionSettingsFactory = collectionSettingsFactory;
this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.update_all");
this.addTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.add" );
}
@Override
public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope,
final Entity entity ) {
//bootstrap the lower modules from their caches
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
final Id entityId = entity.getId();
//we always index in the target scope
final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
//we may have to index we're indexing from source->target here
final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
//do our observable for batching
//try to send a whole batch if we can
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
.buffer(indexFig.getIndexBatchSize() )
//map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer )
//collect results into a single batch
.collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
if (logger.isDebugEnabled()) {
logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
}
final Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
batch.index( indexEdge, entity ,fieldsToIndex);
} )
//return the future from the batch execution
.map( batch -> batch.build() ) );
return ObservableTimer.time( batches, indexTimer );
}
@Override
public Observable<IndexOperationMessage> indexEdge( final ApplicationScope applicationScope, final Entity entity, final Edge edge ) {
final Observable<IndexOperationMessage> batches = Observable.just( edge ).map( observableEdge -> {
//if the node is the target node, generate our scope correctly
if ( edge.getTargetNode().equals( entity.getId() ) ) {
return generateScopeFromSource( edge );
}
throw new IllegalArgumentException("target not equal to entity + "+entity.getId());
} ).map( indexEdge -> {
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final EntityIndexBatch batch = ei.createBatch();
if (logger.isDebugEnabled()) {
logger.debug("adding edge {} to batch for entity {}", indexEdge, entity);
}
Optional<Set<String>> fieldsToIndex = getFilteredStringObjectMap( indexEdge );
batch.index( indexEdge, entity ,fieldsToIndex);
return batch.build();
} );
return ObservableTimer.time( batches, addTimer );
}
/**
* This method takes in an entity and flattens it out then begins the process to filter out
* properties that we do not want to index. Once flatted we parse through each property
* and verify that we want it to be indexed on. The set of default properties that will always be indexed are as follows.
* UUID - TYPE - MODIFIED - CREATED. Depending on the schema this may change. For instance, users will always require
* NAME, but the above four will always be taken in.
* @param indexEdge
* @return This returns a filtered map that contains the flatted properties of the entity. If there isn't a schema
* associated with the collection then return null ( and index the entity in its entirety )
*/
private Optional<Set<String>> getFilteredStringObjectMap( final IndexEdge indexEdge ) {
Id owner = new SimpleId( indexEdge.getNodeId().getUuid(), TYPE_APPLICATION );
Set<String> defaultProperties;
String collectionName = CpNamingUtils.getCollectionNameFromEdgeName( indexEdge.getEdgeName() );
CollectionSettings collectionSettings =
collectionSettingsFactory.getInstance( new CollectionSettingsScopeImpl( owner, collectionName) );
Optional<Map<String, Object>> collectionIndexingSchema =
collectionSettings.getCollectionSettings( collectionName );
//If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return.
if ( collectionIndexingSchema.isPresent()) {
Map jsonMapData = collectionIndexingSchema.get();
Schema schema = Schema.getDefaultSchema();
defaultProperties = schema.getRequiredProperties( collectionName );
Object fields = jsonMapData.get("fields");
if ( fields != null && fields instanceof String && "all".equalsIgnoreCase(fields.toString())) {
return Optional.absent();
}
if ( fields != null && fields instanceof List ) {
defaultProperties.addAll( (List) fields );
}
} else {
return Optional.absent();
}
return Optional.of(defaultProperties);
}
// DO NOT USE THIS AS THE QUERY TO ES CAN CAUSE EXTREME LOAD
// TODO REMOVE THIS AND UPDATE THE TESTS TO NOT USE THIS METHOD
@Override
public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
final Edge edge ) {
final Observable<IndexOperationMessage> batches =
Observable.just( edge ).map( edgeValue -> {
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
EntityIndexBatch batch = ei.createBatch();
//review why generating the Scope from the Source and the target node makes sense.
final IndexEdge fromSource = generateScopeFromSource( edge );
final Id targetId = edge.getTargetNode();
CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId );
//1. Feed the observable the candidate results you got back. Since it now does the aggregation for you
// you don't need to worry about putting your code in a do while.
batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch );
final IndexEdge fromTarget = generateScopeFromTarget( edge );
final Id sourceId = edge.getSourceNode();
CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId );
batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch );
return batch.build();
} );
return ObservableTimer.time( batches, addTimer );
}
@Override
public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
final Id entityId, final UUID entityVersion){
if (logger.isTraceEnabled()) {
logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString());
}
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
final EntityIndexBatch entityBatch = ei.createBatch();
entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
return Observable.just(entityBatch.build());
}
@Override
public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
final Id entityId,
final List<UUID> versions) {
final EntityIndex ei = entityIndexFactory.
createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
// use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code for de-indexing
// previously .timsetamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
Long.MAX_VALUE ) );
final EntityIndexBatch batch = ei.createBatch();
versions.forEach( version -> {
batch.deindex(searchEdgeFromSource, entityId, version);
});
return Observable.just(batch.build());
}
/**
* Takes in candidate results and uses the iterator to create batch commands
*/
public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){
Iterator itr = edgesToBeDeindexed.iterator();
while( itr.hasNext() ) {
batch.deindex( edge, ( CandidateResult ) itr.next());
}
return batch;
}
}