blob: e7dbb101a5ee153a8afaf6004b48b6d33521396b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.netflix.hystrix.HystrixCommandProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.EntityUtils;
import rx.functions.Action1;
import java.util.*;
/**
* This phase execute all unique value verification on the MvccEntity.
*/
@Singleton
public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> {
private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class );
private ActorSystemFig actorSystemFig;
private UniqueValuesFig uniqueValuesFig;
private UniqueValuesService akkaUvService;
private final UniqueValueSerializationStrategy uniqueValueStrat;
private static int uniqueVerifyPoolSize = 100;
private static int uniqueVerifyTimeoutMillis= 5000;
protected final SerializationFig serializationFig;
protected final Keyspace keyspace;
protected final Session session;
private final CassandraConfig cassandraFig;
@Inject
public WriteUniqueVerify(final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
final SerializationFig serializationFig,
final Keyspace keyspace,
final CassandraConfig cassandraFig,
final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
final UniqueValuesService akkaUvService,
final Session session ) {
this.keyspace = keyspace;
this.cassandraFig = cassandraFig;
this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService;
this.session = session;
Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
this.uniqueValueStrat = uniqueValueSerializiationStrategy;
this.serializationFig = serializationFig;
uniqueVerifyPoolSize = this.serializationFig.getUniqueVerifyPoolSize();
}
@Override
public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
verifyUniqueFieldsAkka( ioevent );
} else {
verifyUniqueFields( ioevent );
}
}
private void verifyUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioevent) {
MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
final MvccEntity mvccEntity = ioevent.getEvent();
final Entity entity = mvccEntity.getEntity().get();
final ApplicationScope applicationScope = ioevent.getEntityCollection();
String region = ioevent.getRegion();
if ( region == null ) {
region = uniqueValuesFig.getAuthoritativeRegion();
}
if ( region == null ) {
region = actorSystemFig.getRegionLocal();
}
try {
akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region );
} catch (UniqueValueException e) {
Map<String, Field> violations = new HashMap<>();
violations.put( e.getField().getName(), e.getField() );
throw new WriteUniqueVerifyException( mvccEntity, applicationScope, violations );
}
}
private void verifyUniqueFields(CollectionIoEvent<MvccEntity> ioevent) {
MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
final MvccEntity mvccEntity = ioevent.getEvent();
final Entity entity = mvccEntity.getEntity().get();
final ApplicationScope scope = ioevent.getEntityCollection();
final BatchStatement batch = new BatchStatement();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
//
// Construct all the functions for verifying we're unique
//
final Map<String, Field> preWriteUniquenessViolations = new HashMap<>( uniqueFields.size() );
for ( final Field field : EntityUtils.getUniqueFields(entity)) {
// if it's unique, create a function to validate it and add it to the list of
// concurrent validations
// use write-first then read strategy
final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
// don't use read repair on this pre-write check
// stronger consistency is extremely important here, more so than performance
UniqueValueSet set = uniqueValueStrat.load(scope, cassandraFig.getDataStaxReadConsistentCl(),
written.getEntityId().getType(), Collections.singletonList(written.getField()), false);
set.forEach(uniqueValue -> {
if(!uniqueValue.getEntityId().getUuid().equals(written.getEntityId().getUuid())){
if(logger.isTraceEnabled()){
logger.trace("Pre-write violation detected. Attempted write for unique value [{}={}] and " +
"entity id [{}], entity version [{}] conflicts with already existing entity id [{}], " +
"entity version [{}]",
written.getField().getName(),
written.getField().getValue().toString(),
written.getEntityId().getUuid(),
written.getEntityVersion(),
uniqueValue.getEntityId().getUuid(),
uniqueValue.getEntityVersion());
}
preWriteUniquenessViolations.put(field.getName(), field);
}
});
// only build the batch statement if we don't have a violation for the field
if( preWriteUniquenessViolations.get(field.getName()) == null) {
// use TTL in case something goes wrong before entity is finally committed
batch.add(uniqueValueStrat.writeCQL(scope, written, serializationFig.getTimeout()));
uniqueFields.add(field);
}
}
if(preWriteUniquenessViolations.size() > 0 ){
if(logger.isTraceEnabled()){
logger.trace("Pre-write unique violations found, raising exception before executing first write");
}
throw new WriteUniqueVerifyException(mvccEntity, scope, preWriteUniquenessViolations );
}
//short circuit nothing to do
if ( uniqueFields.size() == 0 ) {
return ;
}
//perform the write
session.execute(batch);
// use simple thread pool to verify fields in parallel
ConsistentReplayCommand cmd = new ConsistentReplayCommand(
uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity);
Map<String,Field> uniquenessViolations = cmd.execute();
//do we want to do this?
//We have violations, throw an exception
if ( !uniquenessViolations.isEmpty() ) {
throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
}
}
private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final CassandraConfig fig;
private final ApplicationScope scope;
private final String type;
private final List<Field> uniqueFields;
private final Entity entity;
public ConsistentReplayCommand( UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
CassandraConfig fig, ApplicationScope scope, final String type, List<Field>
uniqueFields, Entity entity ){
super(REPLAY_GROUP);
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.fig = fig;
this.scope = scope;
this.type = type;
this.uniqueFields = uniqueFields;
this.entity = entity;
}
@Override
protected Map<String, Field> run() throws Exception {
return executeStrategy(fig.getDataStaxReadCl());
}
@Override
protected Map<String, Field> getFallback() {
// fallback with same CL as there are many reasons the 1st execution failed, not just due to consistency problems
return executeStrategy(fig.getDataStaxReadCl());
}
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
final UniqueValueSet uniqueValues;
// load ascending for verification to make sure we wrote is the last read back
// don't read repair on this read because our write-first strategy will introduce a duplicate
uniqueValues =
uniqueValueSerializationStrategy.load( scope, consistencyLevel, type, uniqueFields, false);
final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
//loop through each field that was unique
for ( final Field field : uniqueFields ) {
final UniqueValue uniqueValue = uniqueValues.getValue( field.getName() );
if ( uniqueValue == null ) {
throw new RuntimeException(
String.format( "Could not retrieve unique value for field %s, unable to verify",
field.getName() ) );
}
final Id returnedEntityId = uniqueValue.getEntityId();
if ( !entity.getId().equals(returnedEntityId) ) {
if(logger.isTraceEnabled()) {
logger.trace("Violation occurred when verifying unique value [{}={}]. " +
"Returned entity id [{}] does not match expected entity id [{}]",
field.getName(), field.getValue().toString(),
returnedEntityId,
entity.getId()
);
}
uniquenessViolations.put( field.getName(), field );
}
}
return uniquenessViolations;
}
}
/**
* Command group used for realtime user commands
*/
private static final HystrixCommand.Setter
REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) )
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) )
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(uniqueVerifyTimeoutMillis));
}