blob: 0853adb43d8544ea8f0406faf53f85e632385911 [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import com.google.common.base.Optional;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Preconditions;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
* Implementation of the shard group compaction
*/
@Singleton
public class ShardGroupCompactionImpl implements ShardGroupCompaction {
private final AtomicLong countAudits;
private static final Logger logger = LoggerFactory.getLogger( ShardGroupCompactionImpl.class );
private static final Charset CHARSET = Charset.forName( "UTF-8" );
private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
private final ListeningExecutorService taskExecutor;
private final TimeService timeService;
private final GraphFig graphFig;
private final NodeShardAllocation nodeShardAllocation;
private final ShardedEdgeSerialization shardedEdgeSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
private final Keyspace keyspace;
private final EdgeShardSerialization edgeShardSerialization;
private final Random random;
private final ShardCompactionTaskTracker shardCompactionTaskTracker;
private final ShardAuditTaskTracker shardAuditTaskTracker;
private final NodeShardCache nodeShardCache;
@Inject
public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
final EdgeShardSerialization edgeShardSerialization,
final AsyncTaskExecutor asyncTaskExecutor,
final NodeShardCache nodeShardCache ) {
this.timeService = timeService;
this.countAudits = new AtomicLong();
this.graphFig = graphFig;
this.nodeShardAllocation = nodeShardAllocation;
this.shardedEdgeSerialization = shardedEdgeSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
this.keyspace = keyspace;
this.edgeShardSerialization = edgeShardSerialization;
this.random = new Random();
this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
this.shardAuditTaskTracker = new ShardAuditTaskTracker();
this.taskExecutor = asyncTaskExecutor.getExecutorService();
this.nodeShardCache = nodeShardCache;
}
/**
* Execute the compaction task. Will return the status the operations performed
*
* @param group The shard entry group to compact
*
* @return The result of the compaction operation
*/
public CompactionResult compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
final long startTime = timeService.getCurrentTime();
Preconditions.checkNotNull( group, "group cannot be null" );
Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
Preconditions
.checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." );
if(logger.isTraceEnabled()) {
logger.trace("Compacting shard group. Audit count is {} ", countAudits.get());
}
final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
final Shard targetShard = group.getCompactionTarget();
final Set<Shard> sourceShards = new HashSet<>( group.getReadShards() );
//remove the target
sourceShards.remove( targetShard );
final UUID timestamp = UUIDGenerator.newTimeUUID();
final long newShardPivot = targetShard.getShardIndex();
final int maxWorkSize = graphFig.getScanPageSize();
/**
* As we move edges, we want to keep track of it
*/
long totalEdgeCount = 0;
for ( Shard sourceShard : sourceShards ) {
final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
long edgeCount = 0;
Iterator<MarkedEdge> edges = edgeMeta
.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
MarkedEdge shardEnd = null;
while ( edges.hasNext() ) {
final MarkedEdge edge = edges.next();
final long edgeTimestamp = edge.getTimestamp();
shardEnd = edge;
/**
* The edge is within a different shard, break
*/
if ( edgeTimestamp < newShardPivot ) {
break;
}
newRowBatch.mergeShallow( edgeMeta
.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
timestamp ) );
deleteRowBatch.mergeShallow( edgeMeta
.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
timestamp ) );
edgeCount++;
// if we're at our count, execute the mutation of writing the edges to the new row, then remove them
// from the old rows
if ( edgeCount % maxWorkSize == 0 ) {
try {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
// Update the shard end after each batch so any reads during transition stay as close to current
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
if(logger.isTraceEnabled()) {
logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
}
updateShardMetaBatch.mergeShallow(
edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
if(logger.isTraceEnabled()) {
logger.trace("Deleting batch of {} from old shard", maxWorkSize);
}
deleteRowBatch.withAtomicBatch(true).execute();
updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
}
totalEdgeCount += edgeCount;
edgeCount = 0;
}
}
totalEdgeCount += edgeCount;
try {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
if(logger.isTraceEnabled()) {
logger.trace("Deleting remaining {} edges from old shard", edgeCount);
}
deleteRowBatch.withAtomicBatch(true).execute();
if (shardEnd != null){
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
if(logger.isTraceEnabled()) {
logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
}
updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
updateShardMetaBatch.execute();
}
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
}
}
if (logger.isTraceEnabled()) {
logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
}
resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
* We didn't move anything this pass, mark the shard as compacted. If we move something,
* it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
if ( totalEdgeCount == 0 ) {
// now that we've marked our target as compacted, we can successfully remove any shards that are not
// compacted themselves in the sources
final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
for ( Shard source : sourceShards ) {
//if we can't safely delete it, don't do so
if ( !group.canBeDeleted( source ) ) {
continue;
}
logger.info( "Source shards have been fully drained. Removing shard {}", source );
final MutationBatch shardRemoval = edgeShardSerialization.removeShardMeta( scope, source, edgeMeta );
shardRemovalRollup.mergeShallow( shardRemoval );
resultBuilder.withRemovedShard( source );
}
try {
shardRemovalRollup.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
compactedShard.setShardEnd(targetShard.getShardEnd());
if(logger.isTraceEnabled()) {
logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard);
}
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
updateMark.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
resultBuilder.withCompactedShard( compactedShard );
}
return resultBuilder.build();
}
@Override
public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
final double repairChance = random.nextDouble();
//don't audit, we didn't hit our chance
if ( repairChance > graphFig.getShardRepairChance() ) {
return Futures.immediateFuture( AuditResult.NOT_CHECKED );
}
countAudits.getAndIncrement();
if(logger.isTraceEnabled()) {
logger.trace("Auditing shard group {}. count is {} ", group, countAudits.get());
}
/**
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
final ListenableFuture<AuditResult> future;
try {
future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
}
catch ( RejectedExecutionException ree ) {
// ignore, if this happens we don't care, we're saturated, we can check later
logger.info( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group );
return Futures.immediateFuture( AuditResult.NOT_CHECKED );
}
/**
* Log our success or failures for debugging purposes
*/
Futures.addCallback( future, new FutureCallback<AuditResult>() {
@Override
public void onSuccess( @Nullable final AuditResult result ) {
if (logger.isTraceEnabled()) {
logger.trace("Successfully completed audit of task {}", result);
}
}
@Override
public void onFailure( final Throwable t ) {
logger.error( "Unable to perform audit. Exception is ", t );
}
} );
return future;
}
private final class ShardAuditTask implements Callable<AuditResult> {
private final ApplicationScope scope;
private final DirectedEdgeMeta edgeMeta;
private final ShardEntryGroup group;
public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
final ShardEntryGroup group ) {
this.scope = scope;
this.edgeMeta = edgeMeta;
this.group = group;
}
@Override
public AuditResult call() throws Exception {
/**
* We don't have a compaction pending. Run an audit on the shards
*/
if ( !group.isCompactionPending() ) {
/**
* Check if we should allocate, we may want to
*/
/**
* It's already compacting, don't do anything
*/
if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
return AuditResult.CHECKED_NO_OP;
}
try {
final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
if ( !created ) {
return AuditResult.CHECKED_NO_OP;
}
}
finally {
shardAuditTaskTracker.complete( scope, edgeMeta, group );
}
return AuditResult.CHECKED_CREATED;
}
//check our taskmanager
/**
* Do the compaction
*/
if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
/**
* It's already compacting, don't do anything
*/
if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
if(logger.isTraceEnabled()) {
logger.trace("Already compacting, won't compact group: {}", group);
}
return AuditResult.COMPACTING;
}
/**
* We use a finally b/c we always want to remove the task track
*/
try {
CompactionResult result = compact( scope, edgeMeta, group );
if(logger.isTraceEnabled()) {
logger.trace("Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}",
scope, edgeMeta, group, result);
}
}
finally {
shardCompactionTaskTracker.complete( scope, edgeMeta, group );
}
return AuditResult.COMPACTED;
}
//no op, there's nothing we need to do to this shard
return AuditResult.NOT_CHECKED;
}
}
/**
* Inner class used to track running tasks per instance
*/
private static abstract class TaskTracker {
private static final Boolean TRUE = true;
private ConcurrentHashMap<Long, Boolean> runningTasks = new ConcurrentHashMap<>();
/**
* Sets this data into our scope to signal it's running to stop other threads from attempting to run
*/
public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
ShardEntryGroup group ) {
final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
/**
* Someone already put the value
*/
return returned == null;
}
/**
* Mark this entry group as complete
*/
public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, ShardEntryGroup group ) {
final long hash = doHash( scope, edgeMeta, group ).hash().asLong();
runningTasks.remove( hash );
}
protected abstract Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup );
}
/**
* Task tracker for shard compaction
*/
private static final class ShardCompactionTaskTracker extends ShardAuditTaskTracker {
/**
* Hash our data into a consistent long
*/
@Override
protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup ) {
final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup );
// add the compaction target to the hash
final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
hasher.putLong( compactionTarget.getShardIndex() );
return hasher;
}
}
/**
* Task tracker for shard audit
*/
private static class ShardAuditTaskTracker extends TaskTracker {
/**
* Hash our data into a consistent long
*/
protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup ) {
final Hasher hasher = MURMUR_128.newHasher();
addToHash( hasher, scope.getApplication() );
for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
addToHash( hasher, nodeMeta.getId() );
hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
}
/**
* Add our edge type
*/
for ( String type : directedEdgeMeta.getTypes() ) {
hasher.putString( type, CHARSET );
}
return hasher;
}
protected void addToHash( final PrimitiveSink into, final Id id ) {
final UUID nodeUuid = id.getUuid();
final String nodeType = id.getType();
into.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
.putString( nodeType, CHARSET );
}
}
public static final class CompactionResult {
public final long copiedEdges;
public final Shard targetShard;
public final Set<Shard> sourceShards;
public final Set<Shard> removedShards;
public final Shard compactedShard;
private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
final Set<Shard> removedShards, final Shard compactedShard ) {
this.copiedEdges = copiedEdges;
this.targetShard = targetShard;
this.compactedShard = compactedShard;
this.sourceShards = Collections.unmodifiableSet( sourceShards );
this.removedShards = Collections.unmodifiableSet( removedShards );
}
/**
* Create a builder to use to create the result
*/
public static CompactionBuilder builder() {
return new CompactionBuilder();
}
@Override
public String toString() {
return "CompactionResult{" +
"copiedEdges=" + copiedEdges +
", targetShard=" + targetShard +
", sourceShards=" + sourceShards +
", removedShards=" + removedShards +
", compactedShard=" + compactedShard +
'}';
}
public static final class CompactionBuilder {
private long copiedEdges;
private Shard targetShard;
private Set<Shard> sourceShards;
private Set<Shard> removedShards = new HashSet<>();
private Shard compactedShard;
public CompactionBuilder withCopiedEdges( final long copiedEdges ) {
this.copiedEdges = copiedEdges;
return this;
}
public CompactionBuilder withTargetShard( final Shard targetShard ) {
this.targetShard = targetShard;
return this;
}
public CompactionBuilder withSourceShards( final Set<Shard> sourceShards ) {
this.sourceShards = sourceShards;
return this;
}
public CompactionBuilder withRemovedShard( final Shard removedShard ) {
this.removedShards.add( removedShard );
return this;
}
public CompactionBuilder withCompactedShard( final Shard compactedShard ) {
this.compactedShard = compactedShard;
return this;
}
public CompactionResult build() {
return new CompactionResult( copiedEdges, targetShard, sourceShards, removedShards, compactedShard );
}
}
}
}