blob: f90f80cefb0f1487ce8d494c291473849bec5452 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.map.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.apache.usergrid.persistence.map.MapScope;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@Singleton
public class MapSerializationImpl implements MapSerialization {
private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
/**
* CFs where the row key contains the source node id
*/
public static final MultiTenantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
new MultiTenantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
/**
* CFs where the row key contains the source node id
*/
public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
new MultiTenantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
/**
* Number of buckets to hash across.
*/
private static final int[] NUM_BUCKETS = { 20 };
/**
* How to funnel keys for buckets
*/
private static final Funnel<String> MAP_KEY_FUNNEL = ( key, into ) -> into.putString( key, StringHashUtils.UTF8 );
/**
* Locator to get us all buckets
*/
private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
private final Keyspace keyspace;
private final CassandraConfig cassandraConfig;
@Inject
public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) {
this.keyspace = keyspace;
this.cassandraConfig = cassandraConfig;
}
@Override
public String getString( final MapScope scope, final String key ) {
Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getStringValue() : null;
}
@Override
public String getStringHighConsistency( final MapScope scope, final String key ) {
Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
return ( col != null ) ? col.getStringValue() : null;
}
@Override
public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
return getValues( scope, keys, STRING_RESULTS_BUILDER );
}
@Override
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
@Override
public void putKey( final ColumnListMutation<String> keysMutation ) {
keysMutation.putColumn( key, true );
}
};
writeString( scope, key, value, op );
}
@Override
public void putString( final MapScope scope, final String key, final String value, final int ttl ) {
Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
final RowOp op = new RowOp() {
@Override
public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value, ttl );
}
@Override
public void putKey( final ColumnListMutation<String> keysMutation ) {
keysMutation.putColumn( key, true, ttl );
}
};
writeString( scope, key, value, op );
}
/**
* Write our string index with the specified row op
*/
private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the
// entry
rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) );
executeBatch( batch );
}
/**
* Callbacks for performing row operations
*/
private static interface RowOp {
/**
* Callback to do the row
*
* @param columnListMutation The column mutation
*/
void putValue( final ColumnListMutation<Boolean> columnListMutation );
/**
* Write the key
*/
void putKey( final ColumnListMutation<String> keysMutation );
}
@Override
public UUID getUuid( final MapScope scope, final String key ) {
Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getUUIDValue() : null;
}
@Override
public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( putUuid, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
executeBatch( batch );
}
@Override
public Long getLong( final MapScope scope, final String key ) {
Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
return ( col != null ) ? col.getLongValue() : null;
}
@Override
public void putLong( final MapScope scope, final String key, final Long value ) {
Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
executeBatch( batch );
}
@Override
public void delete( final MapScope scope, final String key ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the entry
batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
//add it to the keys, we're not sure which one it may have come from
final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
final List<BucketScopedRowKey<String>> rowKeys =
BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
}
executeBatch( batch );
}
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
final MultiTenantColumnFamilyDefinition mapEntries =
new MultiTenantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
final MultiTenantColumnFamilyDefinition mapKeys =
new MultiTenantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
return Arrays.asList( mapEntries, mapKeys );
}
private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//now get all columns, including the "old row key value"
try {
final Column<Boolean> result =
keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
catch ( NotFoundException nfe ) {
//nothing to return
return null;
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
/**
* Get multiple values, using the string builder
*/
private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
for ( final String key : keys ) {
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
rowKeys.add( entryRowKey );
}
//now get all columns, including the "old row key value"
try {
final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice(
rowKeys ).withColumnSlice( true ).execute()
.getResult();
return builder.buildResults( rows );
}
catch ( NotFoundException nfe ) {
//nothing to return
return null;
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
private void executeBatch( MutationBatch batch ) {
try {
batch.execute();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
/**
* Inner class to serialize and edgeIdTypeKey
*/
private static class MapKeySerializer implements CompositeFieldSerializer<String> {
@Override
public void toComposite( final CompositeBuilder builder, final String key ) {
builder.addString( key );
}
@Override
public String fromComposite( final CompositeParser composite ) {
final String key = composite.readString();
return key;
}
}
/**
* Inner class to serialize and edgeIdTypeKey
*/
private static class MapEntrySerializer implements CompositeFieldSerializer<MapEntryKey> {
@Override
public void toComposite( final CompositeBuilder builder, final MapEntryKey key ) {
builder.addString( key.mapName );
builder.addString( key.key );
}
@Override
public MapEntryKey fromComposite( final CompositeParser composite ) {
final String mapName = composite.readString();
final String entryKey = composite.readString();
return new MapEntryKey( mapName, entryKey );
}
}
/**
* Entries for serializing map entries and keys to a row
*/
private static class MapEntryKey {
public final String mapName;
public final String key;
private MapEntryKey( final String mapName, final String key ) {
this.mapName = mapName;
this.key = key;
}
/**
* Create a scoped row key from the key
*/
public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
}
}
/**
* Build the results from the row keys
*/
private static interface ResultsBuilder<T> {
public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
}
public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
@Override
public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
final int size = rows.size();
final Map<String, String> results = new HashMap<>( size );
for ( int i = 0; i < size; i++ ) {
final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
final String value = row.getColumns().getStringValue( true, null );
if ( value == null ) {
continue;
}
results.put( row.getKey().getKey().key, value );
}
return results;
}
}
}