| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.usergrid.persistence.graph.impl; |
| |
| |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.usergrid.persistence.core.metrics.MetricsFactory; |
| import org.apache.usergrid.persistence.core.metrics.ObservableTimer; |
| import org.apache.usergrid.persistence.core.rx.ObservableIterator; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.core.util.ValidationUtils; |
| import org.apache.usergrid.persistence.graph.Edge; |
| import org.apache.usergrid.persistence.graph.GraphFig; |
| import org.apache.usergrid.persistence.graph.GraphManager; |
| import org.apache.usergrid.persistence.graph.MarkedEdge; |
| import org.apache.usergrid.persistence.graph.SearchByEdge; |
| import org.apache.usergrid.persistence.graph.SearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.SearchByIdType; |
| import org.apache.usergrid.persistence.graph.SearchEdgeType; |
| import org.apache.usergrid.persistence.graph.SearchIdType; |
| import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener; |
| import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener; |
| import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization; |
| import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization; |
| import org.apache.usergrid.persistence.graph.serialization.NodeSerialization; |
| import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; |
| import org.apache.usergrid.persistence.model.entity.Id; |
| import org.apache.usergrid.persistence.model.util.UUIDGenerator; |
| |
| import com.codahale.metrics.Timer; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.inject.Inject; |
| import com.netflix.astyanax.MutationBatch; |
| import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; |
| |
| import rx.Observable; |
| |
| |
| /** |
| * Implementation of graph edges |
| */ |
| public class GraphManagerImpl implements GraphManager { |
| |
| private static final Logger logger = LoggerFactory.getLogger( GraphManagerImpl.class ); |
| |
| private final ApplicationScope scope; |
| |
| private final EdgeMetadataSerialization edgeMetadataSerialization; |
| |
| |
| private final EdgeSerialization storageEdgeSerialization; |
| |
| private final NodeSerialization nodeSerialization; |
| |
| private final EdgeDeleteListener edgeDeleteListener; |
| private final NodeDeleteListener nodeDeleteListener; |
| private final Timer writeEdgeTimer; |
| private final Timer markEdgeTimer; |
| private final Timer markNodeTimer; |
| private final Timer loadEdgesFromSourceTimer; |
| private final Timer loadEdgesToTargetTimer; |
| private final Timer loadEdgesVersionsTimer; |
| private final Timer loadEdgesFromSourceByTypeTimer; |
| private final Timer loadEdgesToTargetByTypeTimer; |
| private final Timer getEdgeTypesFromSourceTimer; |
| private final Timer getIdTypesFromSourceTimer; |
| private final Timer getEdgeTypesToTargetTimer; |
| private final Timer getIdTypesToTargetTimer; |
| private final Timer deleteNodeTimer; |
| private final Timer deleteEdgeTimer; |
| |
| |
| private final GraphFig graphFig; |
| |
| |
| @Inject |
| public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization, |
| final EdgeSerialization storageEdgeSerialization, |
| final NodeSerialization nodeSerialization, final GraphFig graphFig, |
| final EdgeDeleteListener edgeDeleteListener, final NodeDeleteListener nodeDeleteListener, |
| final ApplicationScope scope, MetricsFactory metricsFactory ) { |
| |
| |
| ValidationUtils.validateApplicationScope( scope ); |
| Preconditions.checkNotNull( edgeMetadataSerialization, "edgeMetadataSerialization must not be null" ); |
| Preconditions.checkNotNull( storageEdgeSerialization, "storageEdgeSerialization must not be null" ); |
| Preconditions.checkNotNull( nodeSerialization, "nodeSerialization must not be null" ); |
| Preconditions.checkNotNull( graphFig, "consistencyFig must not be null" ); |
| Preconditions.checkNotNull( scope, "scope must not be null" ); |
| Preconditions.checkNotNull( nodeDeleteListener, "nodeDeleteListener must not be null" ); |
| |
| this.scope = scope; |
| this.edgeMetadataSerialization = edgeMetadataSerialization; |
| this.storageEdgeSerialization = storageEdgeSerialization; |
| this.nodeSerialization = nodeSerialization; |
| this.graphFig = graphFig; |
| this.edgeDeleteListener = edgeDeleteListener; |
| this.nodeDeleteListener = nodeDeleteListener; |
| |
| this.markNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "node.mark" ); |
| this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "node.delete" ); |
| this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.write" ); |
| |
| this.markEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.mark" ); |
| this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.delete" ); |
| this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.load_from" ); |
| this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.load_to" ); |
| this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.load_versions" ); |
| this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.load_from_type" ); |
| this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.load_to_type" ); |
| this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.get_edge_from" ); |
| this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "edge.get_to" ); |
| |
| this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "idtype.get_from" ); |
| this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "idtype.get_to" ); |
| |
| |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> writeEdge( final Edge edge ) { |
| GraphValidation.validateEdge( edge ); |
| |
| final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false ); |
| |
| final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> { |
| |
| final UUID timestamp = UUIDGenerator.newTimeUUID(); |
| |
| |
| final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge1 ); |
| |
| final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge1, timestamp ); |
| |
| mutation.mergeShallow( edgeMutation ); |
| |
| try { |
| mutation.execute(); |
| } |
| catch ( ConnectionException e ) { |
| throw new RuntimeException( "Unable to execute mutation", e ); |
| } |
| |
| return edge1; |
| } ); |
| |
| return ObservableTimer.time( observable, writeEdgeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> markEdge( final Edge edge ) { |
| GraphValidation.validateEdge( edge ); |
| |
| final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); |
| |
| final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> { |
| |
| final UUID timestamp = UUIDGenerator.newTimeUUID(); |
| |
| |
| final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge1, timestamp ); |
| |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("Marking edge {} as deleted to commit log", edge1); |
| } |
| try { |
| edgeMutation.execute(); |
| } |
| catch ( ConnectionException e ) { |
| throw new RuntimeException( "Unable to execute mutation", e ); |
| } |
| |
| |
| return edge1; |
| } ); |
| |
| return ObservableTimer.time( observable, markEdgeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<Edge> deleteEdge( final Edge edge ) { |
| |
| GraphValidation.validateEdge( edge ); |
| final UUID startTimestamp = UUIDGenerator.newTimeUUID(); |
| |
| |
| final Observable<Edge> observable = |
| Observable.create( new ObservableIterator<MarkedEdge>( "read edge versions" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgeVersions( scope, |
| new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), |
| Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); |
| } |
| } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked -> |
| //fire our delete listener and wait for the results |
| edgeDeleteListener.receive( scope, marked, startTimestamp ).doOnNext( |
| //log them |
| count -> logger.trace( "removed {} types for edge {} ", count, edge ) ) |
| //return the marked edge |
| .map( count -> marked ) ); |
| |
| |
| return ObservableTimer.time( observable, deleteEdgeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<Id> markNode( final Id node, final long timestamp ) { |
| final Observable<Id> idObservable = Observable.just( node ).map( id -> { |
| |
| //mark the node as deleted |
| |
| final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp ); |
| |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("Marking node {} as deleted to node mark", node); |
| } |
| try { |
| nodeMutation.execute(); |
| } |
| catch ( ConnectionException e ) { |
| throw new RuntimeException( "Unable to execute mutation", e ); |
| } |
| |
| |
| return id; |
| } ); |
| |
| return ObservableTimer.time( idObservable, markNodeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> compactNode( final Id inputNode ) { |
| |
| final UUID startTime = UUIDGenerator.newTimeUUID(); |
| |
| final Observable<MarkedEdge> nodeObservable = |
| Observable.just( inputNode ) |
| .map( node -> nodeSerialization.getMaxVersion( scope, node ) ) |
| //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString())) |
| .takeWhile(maxTimestamp -> maxTimestamp.isPresent() ) |
| //map our delete listener |
| .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) ); |
| |
| return ObservableTimer.time( nodeObservable, this.deleteNodeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> loadEdgeVersions( final SearchByEdge searchByEdge ) { |
| |
| final Observable<MarkedEdge> edges = |
| Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); |
| } |
| } ).buffer( graphFig.getScanPageSize() ) |
| .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) ); |
| |
| return ObservableTimer.time( edges, loadEdgesVersionsTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> loadEdgesFromSource( final SearchByEdgeType search ) { |
| final Observable<MarkedEdge> edges = |
| Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgesFromSource( scope, search ); |
| } |
| } ).buffer( graphFig.getScanPageSize() ) |
| .compose( new EdgeBufferFilter( search.filterMarked() ) ); |
| |
| return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> loadEdgesToTarget( final SearchByEdgeType search ) { |
| final Observable<MarkedEdge> edges = |
| Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgesToTarget( scope, search ); |
| } |
| } ).buffer( graphFig.getScanPageSize() ) |
| .compose( new EdgeBufferFilter( search.filterMarked() ) ); |
| |
| |
| return ObservableTimer.time( edges, loadEdgesToTargetTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> loadEdgesFromSourceByType( final SearchByIdType search ) { |
| final Observable<MarkedEdge> edges = |
| Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); |
| } |
| } ).buffer( graphFig.getScanPageSize() ) |
| .compose( new EdgeBufferFilter( search.filterMarked() ) ); |
| |
| return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<MarkedEdge> loadEdgesToTargetByType( final SearchByIdType search ) { |
| final Observable<MarkedEdge> edges = |
| Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) { |
| @Override |
| protected Iterator<MarkedEdge> getIterator() { |
| return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); |
| } |
| } ).buffer( graphFig.getScanPageSize() ) |
| .compose( new EdgeBufferFilter( search.filterMarked() ) ); |
| |
| return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer ); |
| } |
| |
| |
| @Override |
| public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) { |
| final Observable<String> edgeTypes = |
| Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) { |
| @Override |
| protected Iterator<String> getIterator() { |
| return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); |
| } |
| } ); |
| |
| return ObservableTimer.time( edgeTypes, getEdgeTypesFromSourceTimer ); |
| } |
| |
| |
| @Override |
| public Observable<String> getIdTypesFromSource( final SearchIdType search ) { |
| final Observable<String> edgeTypes = |
| Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) { |
| @Override |
| protected Iterator<String> getIterator() { |
| return edgeMetadataSerialization.getIdTypesFromSource( scope, search ); |
| } |
| } ); |
| |
| return ObservableTimer.time( edgeTypes, getIdTypesFromSourceTimer ); |
| } |
| |
| |
| @Override |
| public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) { |
| final Observable<String> edgeTypes = |
| Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) { |
| @Override |
| protected Iterator<String> getIterator() { |
| return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search ); |
| } |
| } ); |
| |
| return ObservableTimer.time( edgeTypes, getEdgeTypesToTargetTimer ); |
| } |
| |
| |
| @Override |
| public Observable<String> getIdTypesToTarget( final SearchIdType search ) { |
| final Observable<String> edgeTypes = Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) { |
| @Override |
| protected Iterator<String> getIterator() { |
| return edgeMetadataSerialization.getIdTypesToTarget( scope, search ); |
| } |
| } ); |
| |
| return ObservableTimer.time( edgeTypes, getIdTypesToTargetTimer ); |
| } |
| |
| |
| /** |
| * Helper filter to perform mapping and return an observable of pre-filtered edges |
| */ |
| private class EdgeBufferFilter implements |
| Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>, |
| // Observable<MarkedEdge>> { |
| |
| private final boolean filterMarked; |
| |
| |
| private EdgeBufferFilter( final boolean filterMarked ) { |
| this.filterMarked = filterMarked; |
| } |
| |
| |
| @Override |
| |
| /** |
| * Takes a buffered list of marked edges. It then does a single round trip to fetch marked ids These are then |
| * used in conjunction with the max version filter to filter any edges that should not be returned |
| * |
| * @return An observable that emits only edges that can be consumed. There could be multiple versions of the |
| * same edge so those need de-duped. |
| */ public Observable<MarkedEdge> call( final Observable<List<MarkedEdge>> markedEdgesObservable ) { |
| |
| return markedEdgesObservable.flatMap( markedEdges -> { |
| |
| final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges ); |
| |
| //We need to filter, perform that filter |
| final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); |
| |
| return markedEdgeObservable.map( edge -> { |
| |
| /** |
| * Make sure we mark source and target deleted nodes as such |
| */ |
| |
| final long edgeTimestamp = edge.getTimestamp(); |
| |
| |
| final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); |
| |
| //the source Id has been marked for deletion. It's version is <= to the marked version for |
| // deletion, |
| // so we need to discard it |
| final boolean isSourceDeleted = ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ); |
| |
| final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); |
| |
| //the target Id has been marked for deletion. It's version is <= to the marked version for |
| // deletion, |
| // so we need to discard it |
| final boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ); |
| |
| //one has been marked for deletion, return it |
| if(isSourceDeleted || isTargetDeleted){ |
| return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted ); |
| } |
| |
| return edge; |
| } ).filter( simpleMarkedEdge -> { |
| if(!filterMarked){ |
| return true; |
| } |
| |
| if(logger.isTraceEnabled()){ |
| logger.trace("Filtering edge {}", simpleMarkedEdge); |
| } |
| |
| //if any one of these is true, we filter it |
| return !simpleMarkedEdge.isDeleted() && !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted(); |
| }); |
| } ); |
| } |
| } |
| } |