blob: 133465045ce0b5d52a6ee61cf1208270bf7b148d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence.cache.impl;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.Clause;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Using;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.cache.CacheScope;
import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.*;
import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Serialize cache to Cassandra.
*/
public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerialization<K,V> {
public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class);
// row-keys are (app UUID, application type, app UUID as string, consistent hash int as bucket number)
// column names are K key toString()
// column values are serialization of V value
private static final String SCOPED_CACHE_TABLE = CQLUtils.quote("SCOPED_CACHE");
private static final Collection<String> SCOPED_CACHE_PARTITION_KEYS = Collections.singletonList("key");
private static final Collection<String> SCOPED_CACHE_COLUMN_KEYS = Collections.singletonList("column1");
private static final Map<String, DataType.Name> SCOPED_CACHE_COLUMNS =
new HashMap<String, DataType.Name>() {{
put( "key", DataType.Name.BLOB );
put( "column1", DataType.Name.BLOB );
put( "value", DataType.Name.BLOB ); }};
private static final Map<String, String> SCOPED_CACHE_CLUSTERING_ORDER =
new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
/** Number of buckets to hash across */
private static final int[] NUM_BUCKETS = {20};
/** How to funnel keys for buckets */
private static final Funnel<String> MAP_KEY_FUNNEL =
(Funnel<String>) (key, into) -> into.putString(key, StringHashUtils.UTF8);
/** Locator to get us all buckets */
private static final ExpandingShardLocator<String>
BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
private final Session session;
private final CassandraConfig cassandraConfig;
private final ObjectMapper MAPPER = new ObjectMapper();
@Inject
public ScopedCacheSerializationImpl( final Session session,
final CassandraConfig cassandraConfig ) {
this.session = session;
this.cassandraConfig = cassandraConfig;
MAPPER.enableDefaultTyping();
MAPPER.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
}
@Override
public V readValue(CacheScope scope, K key, TypeReference typeRef ) {
return readValueCQL( scope, key, typeRef);
}
private V readValueCQL(CacheScope scope, K key, TypeReference typeRef){
Preconditions.checkNotNull(scope, "scope is required");
Preconditions.checkNotNull(key, "key is required");
final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
// determine column name based on K key to string
final String columnName = key.toString();
final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
final Statement statement = QueryBuilder.select().all().from(SCOPED_CACHE_TABLE)
.where(inKey)
.and(inColumn)
.setConsistencyLevel(cassandraConfig.getDataStaxReadCl());
final ResultSet resultSet = session.execute(statement);
final com.datastax.driver.core.Row row = resultSet.one();
if (row == null){
if(logger.isDebugEnabled()){
logger.debug("Cache value not found for key {}", key );
}
return null;
}
try {
return MAPPER.readValue(row.getBytes("value").array(), typeRef);
} catch (IOException ioe) {
logger.error("Unable to read cached value", ioe);
throw new RuntimeException("Unable to read cached value", ioe);
}
}
@Override
public V writeValue(CacheScope scope, K key, V value, Integer ttl) {
return writeValueCQL( scope, key, value, ttl);
}
private V writeValueCQL(CacheScope scope, K key, V value, Integer ttl) {
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required");
Preconditions.checkNotNull( ttl, "ttl is required");
final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
// determine column name based on K key to string
final String columnName = key.toString();
// serialize cache item
byte[] cacheBytes;
try {
cacheBytes = MAPPER.writeValueAsBytes(value);
} catch (JsonProcessingException jpe) {
throw new RuntimeException("Unable to serialize cache value", jpe);
}
final Using timeToLive = QueryBuilder.ttl(ttl);
// convert to ByteBuffer for the blob DataType in Cassandra
final ByteBuffer bb = ByteBuffer.allocate(cacheBytes.length);
bb.put(cacheBytes);
bb.flip();
final Statement cacheEntry = QueryBuilder.insertInto(SCOPED_CACHE_TABLE)
.using(timeToLive)
.value("key", getPartitionKey(scope, rowKeyString, bucket))
.value("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED))
.value("value", bb);
session.execute(cacheEntry);
logger.debug("Wrote cache item to scope {}\n key/value types {}/{}\n key:value: {}:{}",
scope.getApplication().getUuid(),
key.getClass().getSimpleName(),
value.getClass().getSimpleName(),
key,
value);
return value;
}
@Override
public void removeValue(CacheScope scope, K key) {
removeValueCQL(scope, key);
}
private void removeValueCQL(CacheScope scope, K key) {
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
// determine bucketed row-key based application UUID
final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
// determine column name based on K key to string
final String columnName = key.toString();
final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
.where(inKey)
.and(inColumn);
session.execute(statement);
}
@Override
public void invalidate(CacheScope scope) {
invalidateCQL(scope);
logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
}
private void invalidateCQL(CacheScope scope){
Preconditions.checkNotNull(scope, "scope is required");
// determine bucketed row-key based application UUID
final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
.where(inKey);
session.execute(statement);
}
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
return Collections.emptyList();
}
@Override
public Collection<TableDefinition> getTables() {
final TableDefinition scopedCache =
new TableDefinition( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS,
SCOPED_CACHE_COLUMNS, TableDefinition.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER);
return Collections.singletonList(scopedCache);
}
private ByteBuffer getPartitionKey(CacheScope scope, String key, int bucketNumber){
return serializeKeys(scope.getApplication().getUuid(),
scope.getApplication().getType(), bucketNumber, key);
}
private static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, int bucketNumber, String rowKeyString ){
List<Object> keys = new ArrayList<>(4);
keys.add(0, ownerUUID);
keys.add(1, ownerType);
keys.add(2, bucketNumber);
keys.add(3, rowKeyString);
// UUIDs are 16 bytes, allocate the buffer accordingly
int size = 16+ownerType.length()+rowKeyString.length();
// ints are 4 bytes, add for the bucket
size += 4;
// we always need to add length for the 2 byte short and 1 byte equality
size += keys.size()*3;
ByteBuffer stuff = ByteBuffer.allocate(size);
for (Object key : keys) {
ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
if (kb == null) {
kb = ByteBuffer.allocate(0);
}
stuff.putShort((short) kb.remaining());
stuff.put(kb.slice());
stuff.put((byte) 0);
}
stuff.flip();
return stuff.duplicate();
}
}