blob: 57199e2b13e1f7d548f137efc81efb30816a3faa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.usergrid.persistence.collection.serialization.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.EntityUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.AbstractSerializer;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
/**
* @author tnine
*/
public abstract class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy {
private static final Logger logger = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class );
protected final Keyspace keyspace;
protected final SerializationFig serializationFig;
protected final CassandraFig cassandraFig;
private final MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> columnFamily;
@Inject
public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig,
final CassandraFig cassandraFig ) {
this.keyspace = keyspace;
this.serializationFig = serializationFig;
this.cassandraFig = cassandraFig;
this.columnFamily = getColumnFamily();
}
@Override
public MutationBatch write( final ApplicationScope applicationScope, final MvccEntity entity ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entity, "entity is required" );
final UUID colName = entity.getVersion();
final Id entityId = entity.getId();
return doWrite( applicationScope, entityId, new RowOp() {
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
colMutation.putColumn( colName, getEntitySerializer()
.toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) );
}
} );
}
@Override
public EntitySet load( final ApplicationScope applicationScope, final Collection<Id> entityIds,
final UUID maxVersion ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityIds, "entityIds is required" );
Preconditions.checkArgument( entityIds.size() > 0, "entityIds is required" );
Preconditions.checkNotNull( maxVersion, "version is required" );
if( entityIds.size() > serializationFig.getMaxLoadSize()){
throw new IllegalArgumentException( "requested load size cannot be over configured maximum of " + serializationFig.getMaxLoadSize() );
}
final Id applicationId = applicationScope.getApplication();
final Id ownerId = applicationId;
final List<ScopedRowKey<CollectionPrefixedKey<Id>>> rowKeys = new ArrayList<>( entityIds.size() );
for ( final Id entityId : entityIds ) {
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityId.getType() );
final CollectionPrefixedKey<Id> collectionPrefixedKey =
new CollectionPrefixedKey<>( collectionName, ownerId, entityId );
final ScopedRowKey<CollectionPrefixedKey<Id>> rowKey =
ScopedRowKey.fromKey( applicationId, collectionPrefixedKey );
rowKeys.add( rowKey );
}
/**
* Our settings may mean we exceed our maximum thrift buffer size. If we do, we have to make multiple requests, not just one.
* Perform the calculations and the appropriate request patterns
*
*/
final int maxEntityResultSizeInBytes = serializationFig.getMaxEntitySize() * entityIds.size();
//if we're less than 1, set the number of requests to 1
final int numberRequests = Math.max(1, maxEntityResultSizeInBytes / cassandraFig.getThriftBufferSize());
final int entitiesPerRequest = entityIds.size() / numberRequests;
final Scheduler scheduler;
//if it's a single request, run it on the same thread
if(numberRequests == 1){
scheduler = Schedulers.immediate();
}
//if it's more than 1 request, run them on the I/O scheduler
else{
scheduler = Schedulers.io();
}
final EntitySetImpl entitySetResults = Observable.from( rowKeys )
//buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
.buffer( entitiesPerRequest ).flatMap( listObservable -> {
//here, we execute our query then emit the items either in parallel, or on the current thread
// if we have more than 1 request
return Observable.just( listObservable ).map( scopedRowKeys -> {
try {
return keyspace.prepareQuery( columnFamily ).getKeySlice( scopedRowKeys )
.withColumnRange( maxVersion, null, false, 1 ).execute().getResult();
}
catch ( ConnectionException e ) {
throw new CollectionRuntimeException( null, applicationScope,
"An error occurred connecting to cassandra", e );
}
} ).subscribeOn( scheduler );
}, 10 )
.reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =
rows.iterator();
while ( latestEntityColumns.hasNext() ) {
final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
final ColumnList<UUID> columns = row.getColumns();
if ( columns.size() == 0 ) {
continue;
}
final Id entityId = row.getKey().getKey().getSubKey();
final Column<UUID> column = columns.getColumnByIndex( 0 );
final MvccEntity parsedEntity =
new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
entitySet.addEntity( parsedEntity );
}
return entitySet;
} ).toBlocking().last();
return entitySetResults;
}
@Override
public Iterator<MvccEntity> loadDescendingHistory( final ApplicationScope applicationScope, final Id entityId,
final UUID version, final int fetchSize ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
Preconditions.checkArgument( fetchSize > 0, "max Size must be greater than 0" );
final Id applicationId = applicationScope.getApplication();
final Id ownerId = applicationId;
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityId.getType() );
final CollectionPrefixedKey<Id> collectionPrefixedKey =
new CollectionPrefixedKey<>( collectionName, ownerId, entityId );
final ScopedRowKey<CollectionPrefixedKey<Id>> rowKey =
ScopedRowKey.fromKey( applicationId, collectionPrefixedKey );
RowQuery<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> query =
keyspace.prepareQuery( columnFamily ).getKey( rowKey )
.withColumnRange( version, null, false, fetchSize );
return new ColumnNameIterator( query, new MvccColumnParser( entityId, getEntitySerializer() ), false );
}
@Override
public Iterator<MvccEntity> loadAscendingHistory( final ApplicationScope applicationScope, final Id entityId,
final UUID version, final int fetchSize ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
Preconditions.checkArgument( fetchSize > 0, "max Size must be greater than 0" );
final Id applicationId = applicationScope.getApplication();
final Id ownerId = applicationId;
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityId.getType() );
final CollectionPrefixedKey<Id> collectionPrefixedKey =
new CollectionPrefixedKey<>( collectionName, ownerId, entityId );
final ScopedRowKey<CollectionPrefixedKey<Id>> rowKey =
ScopedRowKey.fromKey( applicationId, collectionPrefixedKey );
RowQuery<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> query =
keyspace.prepareQuery( columnFamily ).getKey( rowKey )
.withColumnRange( null, version, true, fetchSize );
return new ColumnNameIterator( query, new MvccColumnParser( entityId, getEntitySerializer() ), false );
}
@Override
public Optional<MvccEntity> load( final ApplicationScope scope, final Id entityId ) {
final EntitySet results = load( scope, Collections.singleton( entityId ), UUIDGenerator.newTimeUUID() );
return Optional.fromNullable( results.getEntity( entityId ));
}
@Override
public MutationBatch mark( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
return doWrite( applicationScope, entityId, new RowOp() {
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
colMutation.putColumn( version, getEntitySerializer()
.toByteBuffer( new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ) ) );
}
} );
}
@Override
public MutationBatch delete( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
Preconditions.checkNotNull( applicationScope, "applicationScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
Preconditions.checkNotNull( version, "version is required" );
return doWrite( applicationScope, entityId, new RowOp() {
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
colMutation.deleteColumn( version );
}
} );
}
@Override
public java.util.Collection getColumnFamilies() {
//create the CF entity data. We want it reversed b/c we want the most recent version at the top of the
//row for fast seeks
MultiTenantColumnFamilyDefinition cf =
new MultiTenantColumnFamilyDefinition( columnFamily, BytesType.class.getSimpleName(),
ReversedType.class.getSimpleName() + "(" + UUIDType.class.getSimpleName() + ")",
BytesType.class.getSimpleName(), MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
return Collections.singleton( cf );
}
/**
* Do the write on the correct row for the entity id with the operation
*/
private MutationBatch doWrite( final ApplicationScope applicationScope, final Id entityId, final RowOp op ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
final Id applicationId = applicationScope.getApplication();
final Id ownerId = applicationId;
final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityId.getType() );
final CollectionPrefixedKey<Id> collectionPrefixedKey =
new CollectionPrefixedKey<>( collectionName, ownerId, entityId );
final ScopedRowKey<CollectionPrefixedKey<Id>> rowKey =
ScopedRowKey.fromKey( applicationId, collectionPrefixedKey );
op.doOp( batch.withRow( columnFamily, rowKey ) );
return batch;
}
/**
* Simple callback to perform puts and deletes with a common row setup code
*/
private static interface RowOp {
/**
* The operation to perform on the row
*/
void doOp( ColumnListMutation<UUID> colMutation );
}
/**
* Simple bean wrapper for state and entity
*/
protected static class EntityWrapper {
protected final MvccEntity.Status status;
protected final Optional<Entity> entity;
protected EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) {
this.status = status;
this.entity = entity;
}
}
/**
* Converts raw columns the to MvccEntity representation
*/
private static final class MvccColumnParser implements ColumnParser<UUID, MvccEntity> {
private final Id id;
private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
private MvccColumnParser( Id id, final AbstractSerializer<EntityWrapper> entityJsonSerializer ) {
this.id = id;
this.entityJsonSerializer = entityJsonSerializer;
}
@Override
public MvccEntity parseColumn( Column<UUID> column ) {
final EntityWrapper deSerialized;
final UUID version = column.getName();
try {
deSerialized = column.getValue( entityJsonSerializer );
}
catch ( DataCorruptionException e ) {
logger.error(
"DATA CORRUPTION DETECTED when de-serializing entity with Id {} and version {}. This means the write was truncated.",
id, version, e );
//return an empty entity, we can never load this one, and we don't want it to bring the system
//to a grinding halt
return new MvccEntityImpl( id, version, MvccEntity.Status.DELETED, Optional.<Entity>absent(),0 );
}
//Inject the id into it.
if ( deSerialized.entity.isPresent() ) {
EntityUtils.setId( deSerialized.entity.get(), id );
}
return new MvccEntityImpl( id, version, deSerialized.status, deSerialized.entity, 0 );
}
}
/**
* Return the entity serializer for this instance
*/
protected abstract AbstractSerializer<EntityWrapper> getEntitySerializer();
/**
* Get the column family to perform operations with
* @return
*/
protected abstract MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> getColumnFamily();
}