blob: ecd0f5c457b03ff98317bb87f6ccdc715faedcf0 [file] [log] [blame]
package org.apache.usergrid.persistence.collection.serialization.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.EntityMap;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.EntityUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
* V3 Serialization Implementation
public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializationStrategy {
public final static int VERSION = 1;
private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
private static final ScopedRowKeySerializer<Id> ROW_KEY_SER = new ScopedRowKeySerializer<>( ID_SER );
private static final MultiTenantColumnFamily<ScopedRowKey<Id>, Boolean> CF_ENTITY_DATA =
new MultiTenantColumnFamily<>( "Entity_Version_Data_V3", ROW_KEY_SER, BooleanSerializer.get() );
private static final Boolean COL_VALUE = Boolean.TRUE;
private final EntitySerializer entitySerializer;
private static final Logger log = LoggerFactory.getLogger( MvccEntitySerializationStrategyV3Impl.class );
protected final Keyspace keyspace;
protected final SerializationFig serializationFig;
protected final CassandraFig cassandraFig;
public MvccEntitySerializationStrategyV3Impl( final Keyspace keyspace, final SerializationFig serializationFig,
final CassandraFig cassandraFig, final MetricsFactory metricsFactory ) {
this.keyspace = keyspace;
this.serializationFig = serializationFig;
this.cassandraFig = cassandraFig;
this.entitySerializer = new EntitySerializer( serializationFig, metricsFactory );
public MutationBatch write( final ApplicationScope applicationScope, final MvccEntity entity ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entity, "entity is required" );
final Id entityId = entity.getId();
final UUID version = entity.getVersion();
Optional<EntityMap> map = EntityMap.fromEntity(entity.getEntity());
ByteBuffer byteBuffer = entitySerializer.toByteBuffer(
new EntityWrapper(entityId,entity.getVersion(), entity.getStatus(), map.isPresent() ? map.get() : null, 0 )
return doWrite( applicationScope, entityId, version, colMutation -> colMutation.putColumn( COL_VALUE, byteBuffer ) );
public EntitySet load( final ApplicationScope applicationScope, final Collection<Id> entityIds,
final UUID maxVersion ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityIds, "entityIds is required" );
Preconditions.checkArgument( entityIds.size() > 0, "entityIds is required" );
Preconditions.checkNotNull( maxVersion, "version is required" );
if ( entityIds.size() > serializationFig.getMaxLoadSize() ) {
throw new IllegalArgumentException(
"requested load size cannot be over configured maximum of " + serializationFig.getMaxLoadSize() );
final Id applicationId = applicationScope.getApplication();
final List<ScopedRowKey<Id>> rowKeys = new ArrayList<>( entityIds.size() );
for ( final Id entityId : entityIds ) {
final ScopedRowKey<Id> rowKey =
ScopedRowKey.fromKey( applicationId, entityId );
rowKeys.add( rowKey );
* Our settings may mean we exceed our maximum thrift buffer size. If we do, we have to make multiple
* requests, not just one.
* Perform the calculations and the appropriate request patterns
final int maxEntityResultSizeInBytes = serializationFig.getMaxEntitySize() * entityIds.size();
//if we're less than 1, set the number of requests to 1
final int numberRequests = Math.max( 1, maxEntityResultSizeInBytes / cassandraFig.getThriftBufferSize() );
final int entitiesPerRequest = entityIds.size() / numberRequests;
final Scheduler scheduler;
//if it's a single request, run it on the same thread
if ( numberRequests == 1 ) {
scheduler = Schedulers.immediate();
//if it's more than 1 request, run them on the I/O scheduler
else {
scheduler =;
final EntitySetImpl entitySetResults = Observable.from( rowKeys )
//buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
.buffer( entitiesPerRequest ).flatMap( listObservable -> {
//here, we execute our query then emit the items either in parallel, or on the current thread
// if we have more than 1 request
return Observable.just( listObservable ).map( scopedRowKeys -> {
try {
return keyspace.prepareQuery( CF_ENTITY_DATA ).getKeySlice( scopedRowKeys )
.withColumnSlice( COL_VALUE ).execute().getResult();
catch ( ConnectionException e ) {
throw new CollectionRuntimeException( null, applicationScope,
"An error occurred connecting to cassandra", e );
} ).subscribeOn( scheduler );
}, 10 ).collect( () -> new EntitySetImpl( entityIds.size() ), ( ( entitySet, rows ) -> {
final Iterator<Row<ScopedRowKey<Id>, Boolean>> latestEntityColumns = rows.iterator();
while ( latestEntityColumns.hasNext() ) {
final Row<ScopedRowKey<Id>, Boolean> row =;
final ColumnList<Boolean> columns = row.getColumns();
if ( columns.size() == 0 ) {
final Id entityId = row.getKey().getKey();
final Column<Boolean> column = columns.getColumnByIndex( 0 );
final MvccEntity parsedEntity =
new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
entitySet.addEntity( parsedEntity );
} ) ).toBlocking().last();
return entitySetResults;
public Iterator<MvccEntity> loadDescendingHistory( final ApplicationScope applicationScope, final Id entityId,
final UUID version, final int fetchSize ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
Preconditions.checkArgument( fetchSize > 0, "max Size must be greater than 0" );
throw new UnsupportedOperationException( "This version does not support loading history" );
public Iterator<MvccEntity> loadAscendingHistory( final ApplicationScope applicationScope, final Id entityId,
final UUID version, final int fetchSize ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
Preconditions.checkArgument( fetchSize > 0, "max Size must be greater than 0" );
throw new UnsupportedOperationException( "This version does not support loading history" );
public Optional<MvccEntity> load( final ApplicationScope scope, final Id entityId ) {
final EntitySet results = load( scope, Collections.singleton( entityId ), UUIDGenerator.newTimeUUID() );
return Optional.fromNullable( results.getEntity( entityId ));
public MutationBatch mark( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
Preconditions.checkNotNull(applicationScope, "applicationScope is required");
Preconditions.checkNotNull(entityId, "entity id is required");
Preconditions.checkNotNull(version, "version is required");
return doWrite(applicationScope, entityId, version, colMutation ->
entitySerializer.toByteBuffer(new EntityWrapper(entityId, version, MvccEntity.Status.DELETED, null, 0))
public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
return doWrite( applicationScope, entityId, version, colMutation -> colMutation.deleteColumn( Boolean.TRUE ) );
public java.util.Collection getColumnFamilies() {
//create the CF entity data. We want it reversed b/c we want the most recent version at the top of the
//row for fast seeks
MultiTenantColumnFamilyDefinition cf =
new MultiTenantColumnFamilyDefinition( CF_ENTITY_DATA, BytesType.class.getSimpleName(),
BooleanType.class.getSimpleName() ,
BytesType.class.getSimpleName(), MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
return Collections.singleton( cf );
public Collection<TableDefinition> getTables() {
return Collections.emptyList();
* Do the write on the correct row for the entity id with the operation
private MutationBatch doWrite( final ApplicationScope applicationScope, final Id entityId, final UUID version, final RowOp op ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
final Id applicationId = applicationScope.getApplication();
final ScopedRowKey<Id> rowKey =
ScopedRowKey.fromKey( applicationId, entityId );
final long timestamp = version.timestamp();
op.doOp( batch.withRow( CF_ENTITY_DATA, rowKey ).setTimestamp( timestamp ) );
return batch;
public int getImplementationVersion() {
return CollectionDataVersions.LOG_REMOVAL.getVersion();
* Converts raw columns the to MvccEntity representation
private static final class MvccColumnParser implements ColumnParser<Boolean, MvccEntity> {
private final Id id;
private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
private MvccColumnParser( final Id id, final AbstractSerializer<EntityWrapper> entityJsonSerializer ) { = id;
this.entityJsonSerializer = entityJsonSerializer;
public MvccEntity parseColumn( Column<Boolean> column ) {
final EntityWrapper deSerialized;
try {
deSerialized = column.getValue( entityJsonSerializer );
catch ( DataCorruptionException e ) {
"DATA CORRUPTION DETECTED when de-serializing entity with Id {}. This means the"
+ " write was truncated.", id, e );
//return an empty entity, we can never load this one, and we don't want it to bring the system
//to a grinding halt
//TODO fix this
return new MvccEntityImpl( id, UUIDGenerator.newTimeUUID(), MvccEntity.Status.DELETED, Optional.<Entity>absent() );
Optional<Entity> entity = deSerialized.getOptionalEntity() ;
return new MvccEntityImpl( id, deSerialized.getVersion(), deSerialized.getStatus(), entity, deSerialized.getSize());
* We should only ever create this once, since this impl is a singleton
public final class EntitySerializer extends AbstractSerializer<EntityWrapper> {
private final JsonFactory JSON_FACTORY = new JsonFactory();
private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
private final Histogram bytesInHistorgram;
private final Histogram bytesOutHistorgram;
private final Timer bytesOutTimer;
private SerializationFig serializationFig;
public EntitySerializer( final SerializationFig serializationFig, final MetricsFactory metricsFactory) {
this.serializationFig = serializationFig;
this.bytesOutHistorgram = metricsFactory.getHistogram(MvccEntitySerializationStrategyV3Impl.class, "bytes.out");
this.bytesOutTimer = metricsFactory.getTimer(MvccEntitySerializationStrategyV3Impl.class, "bytes.out");
this.bytesInHistorgram = metricsFactory.getHistogram(MvccEntitySerializationStrategyV3Impl.class, "");
// mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
// causes slowness
MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
if (wrapper == null) {
return null;
final byte[] wrapperBytes;
//mark this version as empty
if (wrapper.getEntityMap() == null) {
//we're empty
try {
return ByteBuffer.wrap(MAPPER.writeValueAsBytes(wrapper));
} catch (JsonProcessingException jpe) {
throw new RuntimeException("Unable to serialize entity", jpe);
//we have an entity but status is not complete don't allow it
if (wrapper.getStatus() != MvccEntity.Status.COMPLETE) {
throw new UnsupportedOperationException("Only states " + MvccEntity.Status.DELETED + " and " + MvccEntity.Status.COMPLETE + " are supported");
//Convert to internal entity map
try {
wrapperBytes = MAPPER.writeValueAsBytes(wrapper);
final int maxEntrySize = serializationFig.getMaxEntitySize();
if (wrapperBytes.length > maxEntrySize) {
throw new EntityTooLargeException(Entity.fromMap(wrapper.getEntityMap()), maxEntrySize, wrapperBytes.length,
"Your entity cannot exceed " + maxEntrySize + " bytes. The entity you tried to save was "
+ wrapperBytes.length + " bytes");
} catch (JsonProcessingException jpe) {
throw new RuntimeException("Unable to serialize entity", jpe);
return ByteBuffer.wrap(wrapperBytes);
public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
* We intentionally turn data corruption exceptions when we're unable to de-serialize
* the data in cassandra. If this occurs, we'll never be able to de-serialize it
* and it should be considered lost. This is an error that is occurring due to a bug
* in serializing the entity. This is a lazy recognition + repair signal for deployment with
* existing systems.
EntityWrapper entityWrapper;
try {
Timer.Context time = bytesOutTimer.time();
byte[] arr = byteBuffer.array();
bytesOutHistorgram.update( arr == null ? 0 : arr.length);
entityWrapper = MAPPER.readValue(arr, EntityWrapper.class);
entityWrapper.size = arr.length;
catch ( Exception e ) {
if (log.isDebugEnabled()) {
log.debug("Entity Wrapper Deserialized: " + StringSerializer.get().fromByteBuffer(byteBuffer));
throw new DataCorruptionException("Unable to read entity data", e);
// it's been deleted, remove it
if ( entityWrapper.getEntityMap() == null) {
return new EntityWrapper( entityWrapper.getId(), entityWrapper.getVersion(),MvccEntity.Status.DELETED,null,0 );
// it's partial by default
return entityWrapper;
* Simple bean wrapper for state and entity
public static class EntityWrapper {
private Id id;
private MvccEntity.Status status;
private UUID version;
private EntityMap entityMap;
private long size;
public EntityWrapper( ) {
public EntityWrapper( final Id id , final UUID version, final MvccEntity.Status status, final EntityMap entity, final long size ) {
this.version= version;
this.entityMap = entity; = id;
this.size = size;
* do not store status because its based on either the entity being null (deleted) or not null (complete)
* @return
public MvccEntity.Status getStatus() {
return status;
public void setStatus(MvccEntity.Status status) {
this.status = status;
public Id getId() {
return id;
public UUID getVersion() {
return version;
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public EntityMap getEntityMap() {
return entityMap;
public Optional<Entity> getOptionalEntity() {
Entity entity = Entity.fromMap(getEntityMap());
Optional<Entity> entityReturn = Optional.fromNullable(entity);
//Inject the id into it.
if (entityReturn.isPresent()) {
EntityUtils.setId(entityReturn.get(), getId());
EntityUtils.setVersion(entityReturn.get(), getVersion());
return entityReturn;
public long getSize() {
return size;
* Simple callback to perform puts and deletes with a common row setup code
private static interface RowOp {
* The operation to perform on the row
void doOp( ColumnListMutation<Boolean> colMutation );