blob: fc1382fb4870ca6c5fedf33c41bfe6405560e8e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.exception.WriteCommitException;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.EntityUtils;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.functions.Func1;
/**
* This phase should invoke any finalization, and mark the entity as committed in the
* data store before returning
*/
@Singleton
public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class );
ActorSystemFig actorSystemFig;
UniqueValuesFig uniqueValuesFig;
UniqueValuesService akkaUvService;
@Inject
private UniqueValueSerializationStrategy uniqueValueStrat;
private final MvccLogEntrySerializationStrategy logEntryStrat;
private final MvccEntitySerializationStrategy entityStrat;
private final Session session;
@Inject
public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
final MvccEntitySerializationStrategy entryStrat,
final UniqueValueSerializationStrategy uniqueValueStrat,
final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
final UniqueValuesService akkaUvService,
final Session session ) {
Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" );
Preconditions.checkNotNull( uniqueValueStrat, "UniqueValueSerializationStrategy is required");
this.logEntryStrat = logStrat;
this.entityStrat = entryStrat;
this.uniqueValueStrat = uniqueValueStrat;
this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService;
this.session = session;
}
@Override
public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioEvent ) {
return confirmUniqueFields( ioEvent );
}
private CollectionIoEvent<MvccEntity> confirmUniqueFields(CollectionIoEvent<MvccEntity> ioEvent) {
final MvccEntity mvccEntity = ioEvent.getEvent();
MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
final Id entityId = mvccEntity.getId();
final UUID version = mvccEntity.getVersion();
final ApplicationScope applicationScope = ioEvent.getEntityCollection();
//set the version into the entity
final Entity entity = mvccEntity.getEntity().get();
EntityUtils.setVersion( entity, version );
MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
ValidationUtils.verifyTimeUuid( version ,"version" );
final MvccLogEntry startEntry =
new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry );
// now get our actual insert into the entity data
MutationBatch entityMutation = entityStrat.write( applicationScope, mvccEntity );
// merge the 2 into 1 mutation
logMutation.mergeShallow( entityMutation );
// akkaFig may be null when this is called from JUnit tests
if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
String region = ioEvent.getRegion();
if ( region == null ) {
region = uniqueValuesFig.getAuthoritativeRegion();
}
if ( region == null ) {
region = actorSystemFig.getRegionLocal();
}
confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
} else {
confirmUniqueFields( mvccEntity, version, applicationScope, logMutation );
}
try {
logMutation.execute();
}
catch ( ConnectionException e ) {
logger.error( "Failed to execute write asynchronously ", e );
throw new WriteCommitException( mvccEntity, applicationScope,
"Failed to execute write asynchronously ", e );
}
return ioEvent;
}
private void confirmUniqueFields(
MvccEntity mvccEntity, UUID version, ApplicationScope scope, MutationBatch logMutation) {
final Entity entity = mvccEntity.getEntity().get();
// re-write the unique values but this time with no TTL
final BatchStatement uniqueBatch = new BatchStatement();
for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) {
UniqueValue written = new UniqueValueImpl( field, entity.getId(), version);
uniqueBatch.add(uniqueValueStrat.writeCQL(scope, written, -1 ));
logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString());
}
try {
logMutation.execute();
session.execute(uniqueBatch);
}
catch ( ConnectionException e ) {
logger.error( "Failed to execute write asynchronously ", e );
throw new WriteCommitException( mvccEntity, scope,
"Failed to execute write asynchronously ", e );
}
}
private void confirmUniqueFieldsAkka(
MvccEntity mvccEntity, UUID version, ApplicationScope scope, String region ) {
final Entity entity = mvccEntity.getEntity().get();
try {
akkaUvService.confirmUniqueValues( scope, entity, version, region );
} catch (UniqueValueException e) {
Map<String, Field> violations = new HashMap<>();
violations.put( e.getField().getName(), e.getField() );
throw new WriteUniqueVerifyException( mvccEntity, scope, violations );
}
}
}