blob: 9e29e9c505d95d6dedadb06313053acb8c2103e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.collection.serialization.impl;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
import com.netflix.astyanax.model.Composites;
import com.netflix.astyanax.serializers.AbstractSerializer;
import com.netflix.astyanax.serializers.ByteBufferSerializer;
import com.netflix.astyanax.serializers.BytesArraySerializer;
import com.netflix.astyanax.serializers.UUIDSerializer;
/**
* Version 1 implementation of entity serialization
*/
@Singleton
public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializationStrategyImpl {
private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
private static final CollectionScopedRowKeySerializer<Id> ROW_KEY_SER =
new CollectionScopedRowKeySerializer<>( ID_SER );
private static final MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> CF_ENTITY_DATA =
new MultiTenantColumnFamily<>( "Entity_Version_Data", ROW_KEY_SER, UUIDSerializer.get() );
@Inject
public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) {
super( keyspace, serializationFig, cassandraFig );
}
@Override
protected AbstractSerializer<MvccEntitySerializationStrategyImpl.EntityWrapper> getEntitySerializer() {
return ENTITY_JSON_SER;
}
@Override
protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> getColumnFamily() {
return CF_ENTITY_DATA;
}
@Override
public int getImplementationVersion() {
return CollectionDataVersions.INITIAL.getVersion();
}
public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
public static final SmileFactory f = new SmileFactory();
public static ObjectMapper mapper;
private static byte[] STATE_COMPLETE = new byte[] { 0 };
private static byte[] STATE_DELETED = new byte[] { 1 };
private static byte[] STATE_PARTIAL = new byte[] { 2 };
private static byte[] VERSION = new byte[] { 0 };
public EntitySerializer() {
try {
mapper = new ObjectMapper( f );
// mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
// causes slowness
mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
}
catch ( Exception e ) {
throw new RuntimeException( "Error setting up mapper", e );
}
}
@Override
public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
if ( wrapper == null ) {
return null;
}
CompositeBuilder builder = Composites.newCompositeBuilder();
builder.addBytes( VERSION );
//mark this version as empty
if ( !wrapper.entity.isPresent() ) {
//we're empty
builder.addBytes( STATE_DELETED );
return builder.build();
}
//we have an entity
if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
builder.addBytes( STATE_COMPLETE );
}
else {
builder.addBytes( STATE_PARTIAL );
}
try {
final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() );
builder.addBytes( entityBytes );
}
catch ( Exception e ) {
throw new RuntimeException( "Unable to serialize entity", e );
}
return builder.build();
}
@Override
public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
/**
* We intentionally turn data corruption exceptions when we're unable to de-serialize
* the data in cassandra. If this occurs, we'll never be able to de-serialize it
* and it should be considered lost. This is an error that is occuring due to a bug
* in serializing the entity. This is a lazy recognition + repair signal for deployment with
* existing systems.
*/
CompositeParser parser;
try {
parser = Composites.newCompositeParser( byteBuffer );
}
catch ( Exception e ) {
throw new DataCorruptionException( "Unable to de-serialze entity", e );
}
byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
if ( !Arrays.equals( VERSION, version ) ) {
throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
}
byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
// it's been deleted, remove it
if ( Arrays.equals( STATE_DELETED, state ) ) {
return new EntityWrapper( MvccEntity.Status.DELETED, Optional.<Entity>absent() );
}
Entity storedEntity;
ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
byte[] array = jsonBytes.array();
int start = jsonBytes.arrayOffset();
int length = jsonBytes.remaining();
try {
storedEntity = mapper.readValue( array, start, length, Entity.class );
}
catch ( Exception e ) {
throw new DataCorruptionException( "Unable to read entity data", e );
}
final Optional<Entity> entity = Optional.of( storedEntity );
if ( Arrays.equals( STATE_COMPLETE, state ) ) {
return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
}
// it's partial by default
return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
}
}
}