blob: 5bdf3b9156f201836dc62cdb6b442b2a71e4df06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.hystrix.Hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import org.apache.usergrid.persistence.collection.util.EntityUtils;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.functions.Action1;
/**
* This phase execute all unique value verification on the MvccEntity.
*/
@Singleton
public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> {
private static final Logger LOG = LoggerFactory.getLogger( WriteUniqueVerify.class );
private final UniqueValueSerializationStrategy uniqueValueStrat;
protected final SerializationFig serializationFig;
protected final Keyspace keyspace;
private final CassandraConfig cassandraFig;
@Inject
public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) {
this.keyspace = keyspace;
this.cassandraFig = cassandraFig;
Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
this.uniqueValueStrat = uniqueValueSerializiationStrategy;
this.serializationFig = serializationFig;
}
@Override
public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
final MvccEntity mvccEntity = ioevent.getEvent();
final Entity entity = mvccEntity.getEntity().get();
final CollectionScope scope = ioevent.getEntityCollection();
final MutationBatch batch = keyspace.prepareMutationBatch();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
//
// Construct all the functions for verifying we're unique
//
for ( final Field field : EntityUtils.getUniqueFields(entity)) {
// if it's unique, create a function to validate it and add it to the list of
// concurrent validations
// use write-first then read strategy
final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
// use TTL in case something goes wrong before entity is finally committed
final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
batch.mergeShallow( mb );
uniqueFields.add(field);
}
//short circuit nothing to do
if ( uniqueFields.size() == 0 ) {
return ;
}
//perform the write
try {
batch.execute();
}
catch ( ConnectionException ex ) {
throw new RuntimeException( "Unable to write to cassandra", ex );
}
// use simple thread pool to verify fields in parallel
ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, uniqueFields,entity);
Map<String,Field> uniquenessViolations = cmd.execute();
cmd.getFailedExecutionException();
//We have violations, throw an exception
if ( !uniquenessViolations.isEmpty() ) {
throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
}
}
private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final CassandraConfig fig;
private final CollectionScope scope;
private final List<Field> uniqueFields;
private final Entity entity;
public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy, CassandraConfig fig, CollectionScope scope, List<Field> uniqueFields, Entity entity){
super(REPLAY_GROUP);
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.fig = fig;
this.scope = scope;
this.uniqueFields = uniqueFields;
this.entity = entity;
}
@Override
protected Map<String, Field> run() throws Exception {
return executeStrategy(fig.getReadCL());
}
@Override
protected Map<String, Field> getFallback() {
return executeStrategy(fig.getConsistentReadCL());
}
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
//allocate our max size, worst case
//now get the set of fields back
final UniqueValueSet uniqueValues;
try {
uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to read from cassandra", e );
}
final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
//loop through each field that was unique
for ( final Field field : uniqueFields ) {
final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
if ( uniqueValue == null ) {
throw new RuntimeException(
String.format( "Could not retrieve unique value for field %s, unable to verify",
field.getName() ) );
}
final Id returnedEntityId = uniqueValue.getEntityId();
if ( !entity.getId().equals(returnedEntityId) ) {
uniquenessViolations.put( field.getName(), field );
}
}
return uniquenessViolations;
}
}
/**
* Command group used for realtime user commands
*/
public static final HystrixCommand.Setter
REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
}