blob: df4e5d52ac83129b8036efa2ed9812d1dd99c432 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.graph.impl.stage;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Func1;
/**
* Construct the asynchronous node delete from the q
*/
public class NodeDeleteListenerImpl implements NodeDeleteListener {
private static final Logger logger = LoggerFactory.getLogger( NodeDeleteListenerImpl.class );
private final NodeSerialization nodeSerialization;
private final EdgeSerialization storageSerialization;
private final EdgeMetadataSerialization edgeMetadataSerialization;
private final EdgeMetaRepair edgeMetaRepair;
private final GraphFig graphFig;
protected final Keyspace keyspace;
/**
* Wire the serialization dependencies
*/
@Inject
public NodeDeleteListenerImpl( final NodeSerialization nodeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeMetaRepair edgeMetaRepair, final GraphFig graphFig,
final EdgeSerialization storageSerialization,
final Keyspace keyspace ) {
this.nodeSerialization = nodeSerialization;
this.storageSerialization = storageSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeMetaRepair = edgeMetaRepair;
this.graphFig = graphFig;
this.keyspace = keyspace;
}
/**
* Removes this node from the graph.
*
* @param scope The scope of the application
* @param node The node that was deleted
* @param timestamp The timestamp of the event
*
* @return An observable that emits the marked edges that have been removed with this node both as the
* target and source
*/
public Observable<MarkedEdge> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
return Observable.just( node )
//delete source and targets in parallel and merge them into a single observable
.flatMap( id -> {
final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
if (logger.isTraceEnabled()) {
logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
}
if ( !maxVersion.isPresent() ) {
return Observable.empty();
}
// do all the edge deletes and then remove the marked node, return all edges just deleted
return
doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
try {
nodeSerialization.delete( scope, node, maxVersion.get()).execute();
} catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
});
});
}
/**
* Do the deletes
*/
private Observable<MarkedEdge> doDeletes( final Id node, final ApplicationScope scope, final long maxVersion,
final UUID eventTimestamp ) {
/**
* Note that while we're processing, returned edges could be moved from the commit log to storage. As a result,
* we need to issue a delete with the same version as the node delete on both commit log and storage for
* results from the commit log. This
* ensures that the edge is removed from both, regardless of another thread or nodes' processing state.
*
*/
//get all edges pointing to the target node and buffer then into groups for deletion
Observable<MarkedEdge> targetEdges =
getEdgesTypesToTarget(scope, new SimpleSearchEdgeType(node, null, null))
.flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getTargetEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesToTarget(scope,
new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
}
}));
//get all edges pointing to the source node and buffer them into groups for deletion
Observable<MarkedEdge> sourceEdges =
getEdgesTypesFromSource(scope, new SimpleSearchEdgeType(node, null, null))
.flatMap(edgeType -> Observable.create(new ObservableIterator<MarkedEdge>("getSourceEdges") {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesFromSource(scope,
new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
}
}));
//merge both source and target into 1 observable. We'll need to check them all regardless of order
return Observable.merge( targetEdges, sourceEdges )
//buffer and delete marked edges in our buffer size so we're making less trips to cassandra
.buffer( graphFig.getScanPageSize() ).flatMap( markedEdges -> {
if (logger.isTraceEnabled()) {
logger.trace("Batching {} edges for node {} for deletion", markedEdges.size(), node);
}
final MutationBatch batch = keyspace.prepareMutationBatch();
Set<TargetPair> sourceNodes = new HashSet<>( markedEdges.size() );
Set<TargetPair> targetNodes = new HashSet<>( markedEdges.size() );
for ( MarkedEdge edge : markedEdges ) {
//delete the newest edge <= the version on the node delete
//we use the version specified on the delete purposefully. If these edges are re-written
//at a greater time we want them to exit
batch.mergeShallow( storageSerialization.deleteEdge( scope, edge, eventTimestamp ) );
sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
}
try {
batch.execute();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to casandra", e );
}
//now delete meta data
//delete both the source and target meta data in parallel for the edge we deleted in the
// previous step
//if nothing else is using them. We purposefully do not schedule them on a new scheduler
//we want them running on the i/o thread from the Observable emitting all the edges
//
if (logger.isTraceEnabled()) {
logger.trace("About to audit {} source types", sourceNodes.size());
}
Observable<Integer> sourceMetaCleanup =
Observable.from( sourceNodes ).flatMap( targetPair -> edgeMetaRepair
.repairSources( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
if (logger.isTraceEnabled()) {
logger.trace("About to audit {} target types", targetNodes.size());
}
Observable<Integer> targetMetaCleanup =
Observable.from( targetNodes ).flatMap( targetPair -> edgeMetaRepair
.repairTargets( scope, targetPair.id, targetPair.edgeType, maxVersion ) ).last();
//run both the source/target edge type cleanup, then proceed
return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).lastOrDefault( null )
.flatMap(integer -> Observable.from( markedEdges ));
} );
}
/**
* Get all existing edge types to the target node
*/
private Observable<String> getEdgesTypesToTarget( final ApplicationScope scope, final SearchEdgeType search ) {
return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
} );
}
/**
* Get all existing edge types to the target node
*/
private Observable<String> getEdgesTypesFromSource( final ApplicationScope scope, final SearchEdgeType search ) {
return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
} );
}
private static class TargetPair {
protected final Id id;
protected final String edgeType;
private TargetPair( final Id id, final String edgeType ) {
this.id = id;
this.edgeType = edgeType;
}
@Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
}
if ( o == null || getClass() != o.getClass() ) {
return false;
}
final TargetPair that = ( TargetPair ) o;
if ( !edgeType.equals( that.edgeType ) ) {
return false;
}
if ( !id.equals( that.id ) ) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + edgeType.hashCode();
return result;
}
}
}