blob: 8aa5cfc9ca6de05f17468366e88906351ca34798 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import com.codahale.metrics.Timer;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
/**
* Runs on an entity that as just be mark committed, and removes all unique values <= this entity
*/
public class UniqueCleanup
implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class );
private final Timer uniqueCleanupTimer;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final Keyspace keyspace;
private final SerializationFig serializationFig;
@Inject
public UniqueCleanup( final SerializationFig serializationFig,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final Keyspace keyspace, final MetricsFactory metricsFactory ) {
this.serializationFig = serializationFig;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.keyspace = keyspace;
this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" );
}
@Override
public Observable<CollectionIoEvent<MvccEntity>> call(
final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) {
final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
collectionIoEventObservable.flatMap( mvccEntityCollectionIoEvent -> {
final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
//if it's been deleted, we need to remove everything up to an inclusive of this version.
//if it has not, we want to delete everything < this version
final boolean isDeleted = !mvccEntityCollectionIoEvent.getEvent().getEntity().isPresent();
//TODO Refactor this logic into a a class that can be invoked from anywhere
//iterate all unique values
final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
@Override
protected Iterator<UniqueValue> getIterator() {
return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
}
} )
//skip versions > the specified version
//TODO: does this emit for every version before the staticComparator?
.skipWhile( uniqueValue -> {
logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion );
final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
//TODO: should this be equals? That way we clean up the one marked as well
if(isDeleted){
return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
}
return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) >= 0;
} )
//buffer our buffer size, then roll them all up in a single batch mutation
.buffer( serializationFig.getBufferSize() )
//roll them up
.doOnNext( uniqueValues -> {
final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
for ( UniqueValue value : uniqueValues ) {
logger
.debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
uniqueCleanupBatch
.mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
}
try {
uniqueCleanupBatch.execute();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to execute batch mutation", e );
}
} ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer );
} );
return outputObservable;
}
}