blob: cf1984d673574aaed9343a297583b80674d71aac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.corepersistence.pipeline.read.search;
import java.util.*;
import java.util.logging.Filter;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.DistanceField;
import org.apache.usergrid.persistence.model.field.EntityObjectField;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import org.apache.usergrid.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import com.fasterxml.uuid.UUIDComparator;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import rx.Observable;
/**
* Loads entities from an incoming CandidateResult emissions into entities, then streams them on performs internal
* buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries.
* It is up to the collector to determine when to stop streaming entities.
*/
public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate>, FilterResult<Entity>> {
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final IndexProducer indexProducer;
@Inject
public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EntityIndexFactory entityIndexFactory,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final IndexProducer indexProducer
) {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.indexProducer = indexProducer;
}
@Override
public Observable<FilterResult<Entity>> call(
final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
/**
* A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
* objects
*/
final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
final EntityCollectionManager entityCollectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
final EntityIndex applicationIndex = entityIndexFactory
.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
boolean keepStaleEntries = pipelineContext.getKeepStaleEntries();
String query = pipelineContext.getQuery();
boolean isDirectQuery = pipelineContext.getParsedQuery().isDirectQuery();
//buffer them to get a page size we can make 1 network hop
final Observable<FilterResult<Entity>> searchIdSetObservable =
candidateResultsObservable.buffer( pipelineContext.getLimit() )
//load them
.flatMap( candidateResults -> {
if (isDirectQuery) {
// add ids for direct query
updateDirectQueryCandidateResults(entityCollectionManager, candidateResults);
}
//flatten to a list of ids to load
final Observable<List<Candidate>> candidates =
Observable.from(candidateResults)
.map(filterResultCandidate -> filterResultCandidate.getValue()).toList();
//load the ids
final Observable<FilterResult<Entity>> entitySetObservable =
candidates.flatMap(candidatesList -> {
Collection<SelectFieldMapping> mappings = candidatesList.get(0).getFields();
Observable<EntitySet> entitySets = Observable.from(candidatesList)
.map(candidateEntry -> candidateEntry.getCandidateResult().getId()).toList()
.flatMap(idList -> entityCollectionManager.load(idList));
//now we have a collection, validate our candidate set is correct.
return entitySets.map(
entitySet -> new EntityVerifier(
applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
)
.doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query,
isDirectQuery))
.flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
.map(entityFilterResult -> {
final Entity entity = entityFilterResult.getValue();
if (mappings.size() > 0) {
Map<String,Field> fieldMap = new HashMap<String, Field>(mappings.size());
rx.Observable.from(mappings)
.filter(mapping -> {
if ( entity.getFieldMap().containsKey(mapping.getSourceFieldName())) {
return true;
}
String[] parts = mapping.getSourceFieldName().split("\\.");
return nestedFieldCheck( parts, entity.getFieldMap() );
})
.doOnNext(mapping -> {
Field field = entity.getField(mapping.getSourceFieldName());
if ( field != null ) {
field.setName( mapping.getTargetFieldName() );
fieldMap.put( mapping.getTargetFieldName(), field );
} else {
String[] parts = mapping.getSourceFieldName().split("\\.");
nestedFieldSet( fieldMap, parts, entity.getFieldMap() );
}
}).toBlocking().lastOrDefault(null);
entity.setFieldMap(fieldMap);
}
return entityFilterResult;
});
});
return entitySetObservable;
} );
//if we filter all our results, we want to continue to try the next page
return searchIdSetObservable;
}
/**
* Sets field in result map with support for nested fields via recursion.
*
* @param result The result map of filtered fields
* @param parts The parts of the field name (more than one if field is nested)
* @param fieldMap Map of fields of the object
*/
private void nestedFieldSet( Map<String, Field> result, String[] parts, Map<String, Field> fieldMap) {
if ( parts.length > 0 ) {
if ( fieldMap.containsKey( parts[0] )) {
Field field = fieldMap.get( parts[0] );
if ( field instanceof EntityObjectField ) {
EntityObjectField eof = (EntityObjectField)field;
result.putIfAbsent( parts[0], new EntityObjectField( parts[0], new EntityObject() ) );
// recursion
nestedFieldSet(
((EntityObjectField)result.get( parts[0] )).getValue().getFieldMap(),
Arrays.copyOfRange(parts, 1, parts.length),
eof.getValue().getFieldMap());
} else {
result.put( parts[0], field );
}
}
}
}
/**
* Check to see if field should be included in filtered result with support for nested fields via recursion.
*
* @param parts The parts of the field name (more than one if field is nested)
* @param fieldMap Map of fields of the object
*/
private boolean nestedFieldCheck( String[] parts, Map<String, Field> fieldMap) {
if ( parts.length > 0 ) {
if ( fieldMap.containsKey( parts[0] )) {
Field field = fieldMap.get( parts[0] );
if ( field instanceof EntityObjectField ) {
EntityObjectField eof = (EntityObjectField)field;
// recursion
return nestedFieldCheck( Arrays.copyOfRange(parts, 1, parts.length), eof.getValue().getFieldMap());
} else {
return true;
}
}
}
return false;
}
/**
* Update direct query candidates to add IDs.
*/
private void updateDirectQueryCandidateResults(
EntityCollectionManager entityCollectionManager, List<FilterResult<Candidate>> candidatesList) {
for (FilterResult<Candidate> filterCandidate : candidatesList) {
Candidate candidate = filterCandidate.getValue();
CandidateResult candidateResult = candidate.getCandidateResult();
String entityType = candidateResult.getDirectEntityType();
Id entityId = null;
if (candidateResult.isDirectQueryName()) {
entityId = entityCollectionManager.getIdField( entityType,
new StringField( Schema.PROPERTY_NAME, candidateResult.getDirectEntityName() ) )
.toBlocking() .lastOrDefault( null );
} else if (candidateResult.isDirectQueryUUID()) {
entityId = new SimpleId(candidateResult.getDirectEntityUUID(), entityType);
}
filterCandidate.getValue().getCandidateResult().setId(entityId);
}
}
/**
* Our collector to collect entities. Not quite a true collector, but works within our operational
* flow as this state is mutable and difficult to represent functionally
*/
private static final class EntityVerifier {
private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
private List<FilterResult<Entity>> results = new ArrayList<>();
private final EntityIndexBatch batch;
private final List<FilterResult<Candidate>> candidateResults;
private final IndexProducer indexProducer;
private final EntitySet entitySet;
private List<FilterResult<Candidate>> dedupedCandidateResults = new ArrayList<>();
public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
final List<FilterResult<Candidate>> candidateResults,
final IndexProducer indexProducer) {
this.batch = batch;
this.entitySet = entitySet;
this.candidateResults = candidateResults;
this.indexProducer = indexProducer;
this.results = new ArrayList<>( entitySet.size() );
}
/**
* Merge our candidates and our entity set into results
*/
public void merge(boolean keepStaleEntries, String query, boolean isDirectQuery) {
if (!isDirectQuery) {
filterDuplicateCandidates(query);
} else {
// remove direct query duplicates or missing entities (names that don't exist will have null ids)
Set<UUID> foundUUIDs = new HashSet<>();
for (FilterResult<Candidate> candidateFilterResult : candidateResults) {
Id id = candidateFilterResult.getValue().getCandidateResult().getId();
if (id != null) {
UUID uuid = id.getUuid();
if (!foundUUIDs.contains(uuid)) {
dedupedCandidateResults.add(candidateFilterResult);
foundUUIDs.add(uuid);
}
}
}
}
for (final FilterResult<Candidate> candidateResult : dedupedCandidateResults) {
validate(candidateResult, keepStaleEntries, query, isDirectQuery);
}
// no index requests made for direct query, so no need to modify index
if (!isDirectQuery) {
indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
}
}
public List<FilterResult<Entity>> getResults() {
return results;
}
public EntityIndexBatch getBatch() {
return batch;
}
// Helper function to convert a UUID time stamp into a unix date
private Date UUIDTimeStampToDate(UUID uuid) {
long timeStamp = 0L;
// The UUID is supposed to be time based so this should always be '1'
// but this is just used for logging so we don't want to throw an error i it is misused.
if (uuid.version() == 1) {
// this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units
long epochDiff = 122192928000000000L;
// the UUID timestamp is in 100 nanosecond units.
// convert that to milliseconds
timeStamp = ((uuid.timestamp()-epochDiff)/10000);
}
return new Date(timeStamp);
}
// don't need to worry about whether we are keeping stale entries -- this will remove candidates that are
// older than others in the result set
private void filterDuplicateCandidates(String query) {
Map<Id, UUID> latestEntityVersions = new HashMap<>();
// walk through candidates and find latest version for each entityID
for ( final FilterResult<Candidate> filterResult : candidateResults ) {
final Candidate candidate = filterResult.getValue();
final CandidateResult candidateResult = candidate.getCandidateResult();
final Id candidateId = candidateResult.getId();
final UUID candidateVersion = candidateResult.getVersion();
UUID previousCandidateVersion = latestEntityVersions.get(candidateId);
if (previousCandidateVersion != null) {
// replace if newer
if (UUIDComparator.staticCompare(candidateVersion, previousCandidateVersion) > 0) {
latestEntityVersions.put(candidateId, candidateVersion);
}
} else {
latestEntityVersions.put(candidateId, candidateVersion);
}
}
// walk through candidates again, saving newest results and deindexing older
for ( final FilterResult<Candidate> filterResult : candidateResults ) {
final Candidate candidate = filterResult.getValue();
final CandidateResult candidateResult = candidate.getCandidateResult();
final Id candidateId = candidateResult.getId();
final UUID candidateVersion = candidateResult.getVersion();
final UUID latestCandidateVersion = latestEntityVersions.get(candidateId);
if (candidateVersion.equals(latestCandidateVersion)) {
// save candidate
dedupedCandidateResults.add(filterResult);
} else {
// deindex if not the current version in database
final MvccEntity entity = entitySet.getEntity( candidateId );
final UUID databaseVersion = entity.getVersion();
if (!candidateVersion.equals(databaseVersion)) {
Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
logger.warn( "Found old stale entity on edge {} for entityId {} Entity version {} ({}). Candidate version {} ({}). Will not be returned in result set. Query = [{}]",
candidate.getSearchEdge(),
entity.getId().getUuid(),
databaseVersion,
DateUtils.instance.formatIso8601Date(entityTimeStamp),
candidateVersion,
DateUtils.instance.formatIso8601Date(candidateTimeStamp),
query
);
final SearchEdge searchEdge = candidate.getSearchEdge();
batch.deindex(searchEdge, entity.getId(), candidateVersion);
}
}
}
}
private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query,
boolean isDirectQuery) {
final Candidate candidate = filterResult.getValue();
final CandidateResult candidateResult = candidate.getCandidateResult();
final boolean isGeo = candidateResult instanceof GeoCandidateResult;
final SearchEdge searchEdge = candidate.getSearchEdge();
final Id candidateId = candidateResult.getId();
UUID candidateVersion = candidateResult.getVersion();
final MvccEntity entity = entitySet.getEntity( candidateId );
//doesn't exist warn and drop
if ( entity == null ) {
if (!isDirectQuery) {
logger.warn(
"Searched and received candidate with entityId {} and version {}, yet was not found in cassandra. Ignoring since this could be a region sync issue",
candidateId, candidateVersion);
} else {
logger.warn(
"Direct query for entityId {} was not found in cassandra, query=[{}]", candidateId, query);
}
//TODO trigger an audit after a fail count where we explicitly try to repair from other regions
return;
}
final UUID databaseVersion = entity.getVersion();
if (isDirectQuery) {
// use returned (latest) version for direct query
candidateVersion = databaseVersion;
}
final Id entityId = entity.getId();
// The entity is marked as deleted
if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
// when updating entities, we don't delete all previous versions from ES so this action is expected
if(logger.isDebugEnabled()){
logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
searchEdge, entityId, databaseVersion);
}
batch.deindex( searchEdge, entityId, candidateVersion );
return;
}
// entity exists and is newer than ES version, could be a missed or slow index event
if ( UUIDComparator.staticCompare(databaseVersion, candidateVersion) > 0 ) {
Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
Date entityTimeStamp = UUIDTimeStampToDate(databaseVersion);
logger.warn( "Found stale entity on edge {} for entityId {} Entity version {} ({}). Candidate version {} ({}). Will be returned in result set = {} Query = [{}]",
candidate.getSearchEdge(),
entity.getId().getUuid(),
databaseVersion,
DateUtils.instance.formatIso8601Date(entityTimeStamp),
candidateVersion,
DateUtils.instance.formatIso8601Date(candidateTimeStamp),
keepStaleEntries,
query
);
if (!keepStaleEntries) {
batch.deindex(searchEdge, entityId, candidateVersion);
return;
}
}
//ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
//remove the ES record, since the read in cass should cause a read repair, just ignore
if ( UUIDComparator.staticCompare( candidateVersion, databaseVersion ) > 0 ) {
logger.warn(
"Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair should be run",
searchEdge, entityId, databaseVersion);
//TODO trigger an audit after a fail count where we explicitly try to repair from other regions
return;
}
// add the result
final Entity returnEntity = entity.getEntity().get();
if(isGeo){
returnEntity.setField(new DistanceField(((GeoCandidateResult)candidateResult).getDistance()));
}
final Optional<EdgePath> parent = filterResult.getPath();
final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
results.add( toReturn );
}
}
}