blob: d3bd3c5e5354c436f0c37e9539708876f090fd31 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.map.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import com.google.common.base.Preconditions;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.apache.usergrid.persistence.map.MapScope;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
import com.netflix.astyanax.serializers.BooleanSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
@Singleton
public class MapSerializationImpl implements MapSerialization {
private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
/**
* CFs where the row key contains the source node id
*/
public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
MAP_ENTRIES = new MultiTennantColumnFamily<>(
"Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
/**
* CFs where the row key contains the source node id
*/
public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
/**
* Number of buckets to hash across.
*/
private static final int[] NUM_BUCKETS = {20};
/**
* How to funnel keys for buckets
*/
private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
@Override
public void funnel( final String key, final PrimitiveSink into ) {
into.putString( key, StringHashUtils.UTF8 );
}
};
/**
* Locator to get us all buckets
*/
private static final ExpandingShardLocator<String>
BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
private final Keyspace keyspace;
@Inject
public MapSerializationImpl( final Keyspace keyspace ) {this.keyspace = keyspace;}
@Override
public String getString( final MapScope scope, final String key ) {
Column<Boolean> col = getValue(scope, key); // TODO: why boolean?
return (col !=null) ? col.getStringValue(): null;
}
@Override
public void putString( final MapScope scope, final String key, final String value ) {
final RowOp op = new RowOp() {
@Override
public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value );
}
};
writeString( scope, key, value, op );
}
@Override
public void putString( final MapScope scope, final String key, final String value, final int ttl ) {
Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
final RowOp op = new RowOp() {
@Override
public void rowOp( final ScopedRowKey<MapEntryKey> scopedRowKey,
final ColumnListMutation<Boolean> columnListMutation ) {
columnListMutation.putColumn( true, value, ttl );
}
};
writeString( scope, key, value, op );
}
/**
* Write our string index with the specified row op
* @param scope
* @param key
* @param value
* @param rowOp
*/
private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
//serialize to the
// entry
rowOp.rowOp( entryRowKey, batch.withRow( MAP_ENTRIES, entryRowKey ) );
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
//serialize to the entry
batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
executeBatch( batch );
}
private static interface RowOp{
/**
* Callback to do the row
* @param scopedRowKey The row key
* @param columnListMutation The column mutation
*/
void rowOp(final ScopedRowKey<MapEntryKey> scopedRowKey, final ColumnListMutation<Boolean> columnListMutation);
}
@Override
public UUID getUuid( final MapScope scope, final String key ) {
Column<Boolean> col = getValue(scope, key);
return (col !=null) ? col.getUUIDValue(): null;
}
@Override
public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
Preconditions.checkNotNull(scope, "mapscope is required");
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( putUuid, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
//serialize to the entry
batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid);
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey< String> keyRowKey =
BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
//serialize to the entry
batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
executeBatch(batch);
}
@Override
public Long getLong( final MapScope scope, final String key ) {
Column<Boolean> col = getValue(scope, key);
return (col !=null) ? col.getLongValue(): null;
}
@Override
public void putLong( final MapScope scope, final String key, final Long value ) {
Preconditions.checkNotNull(scope, "mapscope is required");
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
final MutationBatch batch = keyspace.prepareMutationBatch();
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
//serialize to the entry
batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
//add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
final BucketScopedRowKey< String> keyRowKey =
BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
//serialize to the entry
batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
executeBatch(batch);
}
@Override
public void delete( final MapScope scope, final String key ) {
final MutationBatch batch = keyspace.prepareMutationBatch();
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
//serialize to the entry
batch.withRow(MAP_ENTRIES, entryRowKey).delete();
//add it to the keys, we're not sure which one it may have come from
final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
final List<BucketScopedRowKey<String>>
rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
for(BucketScopedRowKey<String> rowKey: rowKeys) {
batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
}
executeBatch( batch );
}
@Override
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
final MultiTennantColumnFamilyDefinition mapEntries =
new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
BytesType.class.getSimpleName(),
BytesType.class.getSimpleName(),
BytesType.class.getSimpleName(),
MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
final MultiTennantColumnFamilyDefinition mapKeys =
new MultiTennantColumnFamilyDefinition( MAP_KEYS,
BytesType.class.getSimpleName(),
UTF8Type.class.getSimpleName(),
BytesType.class.getSimpleName(),
MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
return Arrays.asList( mapEntries, mapKeys );
}
private Column<Boolean> getValue(MapScope scope, String key) {
//add it to the entry
final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
//now get all columns, including the "old row key value"
try {
final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES )
.getKey( entryRowKey ).getColumn( true ).execute().getResult();
return result;
}
catch ( NotFoundException nfe ) {
//nothing to return
return null;
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
}
private void executeBatch(MutationBatch batch) {
try {
batch.execute();
} catch (ConnectionException e) {
throw new RuntimeException("Unable to connect to cassandra", e);
}
}
/**
* Inner class to serialize and edgeIdTypeKey
*/
private static class MapKeySerializer implements CompositeFieldSerializer<String> {
@Override
public void toComposite( final CompositeBuilder builder, final String key ) {
builder.addString( key );
}
@Override
public String fromComposite( final CompositeParser composite ) {
final String key = composite.readString();
return key;
}
}
/**
* Inner class to serialize and edgeIdTypeKey
*/
private static class MapEntrySerializer implements CompositeFieldSerializer<MapEntryKey> {
@Override
public void toComposite( final CompositeBuilder builder, final MapEntryKey key ) {
builder.addString( key.mapName );
builder.addString( key.key );
}
@Override
public MapEntryKey fromComposite( final CompositeParser composite ) {
final String mapName = composite.readString();
final String entryKey = composite.readString();
return new MapEntryKey( mapName, entryKey );
}
}
/**
* Entries for serializing map entries and keys to a row
*/
private static class MapEntryKey {
public final String mapName;
public final String key;
private MapEntryKey( final String mapName, final String key ) {
this.mapName = mapName;
this.key = key;
}
/**
* Create a scoped row key from the key
*/
public static ScopedRowKey<MapEntryKey> fromKey(
final MapScope mapScope, final String key ) {
return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
}
}
}