| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; |
| |
| |
| import java.util.Collections; |
| import java.util.Iterator; |
| |
| import org.apache.usergrid.persistence.graph.serialization.impl.shard.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.usergrid.persistence.core.consistency.TimeService; |
| import org.apache.usergrid.persistence.core.scope.ApplicationScope; |
| import org.apache.usergrid.persistence.core.util.ValidationUtils; |
| import org.apache.usergrid.persistence.graph.GraphFig; |
| import org.apache.usergrid.persistence.graph.MarkedEdge; |
| import org.apache.usergrid.persistence.graph.SearchByEdgeType; |
| import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException; |
| import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; |
| |
| import com.google.common.base.Optional; |
| import com.google.common.base.Preconditions; |
| import com.google.inject.Inject; |
| import com.netflix.astyanax.MutationBatch; |
| import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; |
| import com.netflix.astyanax.util.TimeUUIDUtils; |
| |
| |
| /** |
| * Implementation of the node shard monitor and allocation |
| */ |
| public class NodeShardAllocationImpl implements NodeShardAllocation { |
| |
| |
| private static final Logger logger = LoggerFactory.getLogger( NodeShardAllocationImpl.class ); |
| |
| private final EdgeShardSerialization edgeShardSerialization; |
| private final EdgeColumnFamilies edgeColumnFamilies; |
| private final ShardedEdgeSerialization shardedEdgeSerialization; |
| private final TimeService timeService; |
| private final GraphFig graphFig; |
| private final ShardGroupCompaction shardGroupCompaction; |
| private final NodeShardCache nodeShardCache; |
| |
| |
| @Inject |
| public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization, |
| final EdgeColumnFamilies edgeColumnFamilies, |
| final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService, |
| final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction, |
| final NodeShardCache nodeShardCache) { |
| this.edgeShardSerialization = edgeShardSerialization; |
| this.edgeColumnFamilies = edgeColumnFamilies; |
| this.shardedEdgeSerialization = shardedEdgeSerialization; |
| this.timeService = timeService; |
| this.graphFig = graphFig; |
| this.shardGroupCompaction = shardGroupCompaction; |
| this.nodeShardCache = nodeShardCache; |
| } |
| |
| |
| @Override |
| public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId, |
| final DirectedEdgeMeta directedEdgeMeta ) { |
| |
| ValidationUtils.validateApplicationScope( scope ); |
| Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" ); |
| GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta ); |
| |
| Iterator<Shard> existingShards; |
| |
| //its a new node, it doesn't need to check cassandra, it won't exist |
| if ( isNewNode( directedEdgeMeta ) ) { |
| existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator(); |
| } |
| |
| else { |
| existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta ); |
| |
| /** |
| * We didn't get anything out of cassandra, so we need to create the minimum shard |
| */ |
| if ( existingShards == null || !existingShards.hasNext() ) { |
| |
| final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta ); |
| try { |
| batch.execute(); |
| } |
| catch ( ConnectionException e ) { |
| throw new RuntimeException( "Unable to connect to casandra", e ); |
| } |
| |
| existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator(); |
| } |
| } |
| |
| return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope, |
| directedEdgeMeta ); |
| } |
| |
| |
| @Override |
| public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, |
| final DirectedEdgeMeta directedEdgeMeta ) { |
| |
| ValidationUtils.validateApplicationScope( scope ); |
| GraphValidation.validateShardEntryGroup( shardEntryGroup ); |
| GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta ); |
| |
| Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" ); |
| |
| |
| /** |
| * Nothing to do, it's been created very recently, we don't create a new one |
| */ |
| if ( shardEntryGroup.isCompactionPending() ) { |
| if (logger.isTraceEnabled()) logger.trace( "Shard entry group {} is compacting, not auditing", shardEntryGroup ); |
| return false; |
| } |
| |
| //we can't allocate, we have more than 1 write shard currently. We need to compact first |
| if ( shardEntryGroup.entrySize() != 1 ) { |
| if (logger.isTraceEnabled()) logger.trace( "Shard entry group {} does not have 1 entry, not allocating", shardEntryGroup ); |
| return false; |
| } |
| |
| |
| /** |
| * Check the min shard in our system |
| */ |
| final Shard shard = shardEntryGroup.getMinShard(); |
| final long minTime = getMinTime(); |
| |
| |
| |
| if ( shard.getCreatedTime() >= minTime ) { |
| if (logger.isTraceEnabled()) logger.trace( "Shard entry group {} and shard {} is before the minimum created time of {}. Not allocating", shardEntryGroup, shard, minTime ); |
| return false; |
| } |
| |
| |
| /** |
| * Check out if we have a count for our shard allocation |
| */ |
| |
| |
| |
| final long shardSize = graphFig.getShardSize(); |
| |
| |
| |
| /** |
| * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a |
| * shard rapidly, we split it near the head of the values. |
| * Further checks to this group will result in more splits, similar to creating a tree type structure and |
| * splitting each node. |
| * |
| * This means that the lower shard can be re-split later if it is still too large. We do the division to |
| * truncate |
| * to a split point < what our current max is that would be approximately be our pivot ultimately if we split |
| * from the |
| * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point |
| * where we cannot |
| * ultimately compact to the correct shard size. |
| */ |
| |
| |
| /** |
| * Allocate the shard |
| */ |
| |
| final Iterator<MarkedEdge> edges = directedEdgeMeta |
| .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singletonList(shard),0, |
| SearchByEdgeType.Order.ASCENDING ); |
| |
| |
| if ( !edges.hasNext() ) { |
| if (logger.isTraceEnabled()) logger.trace( |
| "Tried to allocate a new shard for edge meta data {}, but no max value could be found in that row", |
| directedEdgeMeta ); |
| return false; |
| } |
| |
| |
| MarkedEdge marked = null; |
| |
| /** |
| * Advance to the pivot point we should use. Once it's compacted, we can split again. |
| * We either want to take the first one (unlikely) or we take our total count - the shard size. |
| * If this is a negative number, we're approaching our max count for this shard, so the first |
| * element will suffice. |
| */ |
| |
| long edgeCount = 0; |
| for ( long i = 1; edges.hasNext(); i++ ) { |
| //we hit a pivot shard, set it since it could be the last one we encounter |
| if ( i % shardSize == 0 ) { |
| marked = edges.next(); |
| } |
| else { |
| edges.next(); |
| } |
| edgeCount++; |
| } |
| |
| |
| /** |
| * Sanity check in case we audit before we have a full shard |
| */ |
| if ( marked == null ) { |
| if (logger.isTraceEnabled()){ |
| logger.trace( "Shard {} in shard group {} not full, " + |
| "not splitting. Edge count: {}", shard, shardEntryGroup, edgeCount ); |
| } |
| return false; |
| } |
| |
| final long createTimestamp = timeService.getCurrentTime(); |
| |
| final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false ); |
| |
| if(logger.isTraceEnabled()) { |
| logger.trace("Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta); |
| } |
| |
| final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta ); |
| |
| try { |
| batch.execute(); |
| |
| if(logger.isTraceEnabled()) { |
| logger.trace("Clearing shard cache"); |
| } |
| |
| // invalidate the shard cache so we can be sure that all read shards are up to date |
| nodeShardCache.invalidate(scope, directedEdgeMeta); |
| } |
| catch ( ConnectionException e ) { |
| throw new RuntimeException( "Unable to connect to casandra", e ); |
| } |
| |
| |
| return true; |
| } |
| |
| |
| @Override |
| public long getMinTime() { |
| |
| final long minimumAllowed = ( long ) (2.5 * graphFig.getShardCacheTimeout()); |
| |
| final long minDelta = graphFig.getShardMinDelta(); |
| |
| |
| if ( minDelta < minimumAllowed ) { |
| throw new GraphRuntimeException( String |
| .format( "You must configure the property %s to be >= 2 x %s. Otherwise you risk losing data", |
| GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) ); |
| } |
| |
| return timeService.getCurrentTime() - minDelta; |
| } |
| |
| |
| /** |
| * Return true if the node has been created within our timeout. If this is the case, we dont' need to check |
| * cassandra, we know it won't exist |
| */ |
| private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) { |
| |
| |
| //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units |
| // correct |
| final long timeoutDelta = graphFig.getShardCacheTimeout(); |
| |
| final long timeNow = timeService.getCurrentTime(); |
| |
| boolean isNew = true; |
| |
| for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) { |
| |
| //short circuit |
| if ( !isNew || node.getId().getUuid().version() > 2 ) { |
| return false; |
| } |
| |
| final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() ); |
| |
| final long newExpirationTimeout = uuidTime + timeoutDelta; |
| |
| //our expiration is after our current time, treat it as new |
| isNew = isNew && newExpirationTimeout > timeNow; |
| } |
| |
| return isNew; |
| } |
| } |