Merge branch 'master' into datastax-cass-driver
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d153ef1..34d46ad 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -59,7 +59,7 @@
# Set a property to tell Usergrid which version of cassandra is being used.
#
-#cassandra.version=1.2
+cassandra.version=2.1
# Set the Cassandra cluster name that this instance of Usergrid should use.
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6c70342..9f2dc88 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -130,6 +130,10 @@
<groupId>org.antlr</groupId>
</exclusion>
<exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
@@ -367,8 +371,6 @@
<groupId>org.apache.usergrid</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
</dependency>
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
index 623400d..28b1ccc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
@@ -112,7 +112,6 @@
}
-
private void setupLegacySchema() throws Exception {
logger.info( "Initialize keyspace and legacy column families" );
@@ -149,7 +148,4 @@
}
}
-
-
-
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
index e13566d..53d2144 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
@@ -19,7 +19,7 @@
*/
package org.apache.usergrid.corepersistence.index;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexAlias;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
index 6a99890..93fb677 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
@@ -21,7 +21,7 @@
import com.google.inject.Inject;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexFig;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ManagementIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ManagementIndexLocationStrategy.java
index 1e13bbf..ffa02cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ManagementIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ManagementIndexLocationStrategy.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.corepersistence.index;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexAlias;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index e3b179d..4d4edaa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -263,7 +263,13 @@
}
//this is intentional. If
else if (appId.isPresent()) {
- return Observable.just(appId.get());
+ return Observable.just(appId.get())
+ .doOnNext(appScope -> {
+ //make sure index is initialized on rebuild
+ entityIndexFactory.createEntityIndex(
+ indexLocationStrategyFactory.getIndexLocationStrategy(appScope)
+ ).initialize();
+ });
}
return allApplicationsObservable.getData()
diff --git a/stack/core/src/main/java/org/apache/usergrid/locking/cassandra/AstyanaxLockManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/locking/cassandra/AstyanaxLockManagerImpl.java
index e26aed4..c0b1f03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/locking/cassandra/AstyanaxLockManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/locking/cassandra/AstyanaxLockManagerImpl.java
@@ -35,6 +35,7 @@
import org.apache.usergrid.locking.Lock;
import org.apache.usergrid.locking.LockManager;
import org.apache.usergrid.locking.LockPathBuilder;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index ff4f252..f8b1f6c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -17,7 +17,6 @@
package org.apache.usergrid.persistence.cassandra;
-import com.google.inject.Inject;
import com.google.inject.Injector;
import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
@@ -36,7 +35,7 @@
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import org.apache.usergrid.locking.LockManager;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.MapUtils;
import org.slf4j.Logger;
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
index 4fea8e2..1661af4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
@@ -23,7 +23,7 @@
import net.jcip.annotations.NotThreadSafe;
import org.apache.usergrid.corepersistence.TestIndexModule;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index f44c028..f7d52d6 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -163,6 +163,17 @@
}
+ @Test
+ public void someTest(){
+
+
+ final String uuidtype = "UUIDType";
+ final String utf8type = "UTF8Type";
+
+ assertEquals(uuidtype.length(), utf8type.length());
+
+ }
+
private List<StreamResult> callStream (final List<Integer> input){
Stream<StreamResult> results = input.stream().map(integer -> {
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index dd6612f..9aeac82 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -14,8 +14,7 @@
# these settings allow tests to run and consistently pass on 16GB MacBook Pro
# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
-cassandra.timeout=2000
-cassandra.connections=1000
+cassandra.connections=50
#Not a good number for real systems. Write shards should be 2x cluster size from our tests
diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
index d6ca300..1334650 100644
--- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
+++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java
@@ -16,6 +16,10 @@
*/
package org.apache.usergrid.persistence.cache.impl;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -23,31 +27,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
-import com.google.common.hash.PrimitiveSink;
import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.Serializer;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.serializers.ObjectSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.cassandra.db.marshal.BytesType;
import org.apache.usergrid.persistence.cache.CacheScope;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Callable;
+import java.nio.ByteBuffer;
+import java.util.*;
/**
@@ -55,55 +48,50 @@
*/
public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerialization<K,V> {
- // row-keys are application ID + consistent hash key
+ public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class);
+
+ // row-keys are (app UUID, application type, app UUID as string, consistent hash int as bucket number)
// column names are K key toString()
// column values are serialization of V value
- public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class);
+ private static final String SCOPED_CACHE_TABLE = CQLUtils.quote("SCOPED_CACHE");
+ private static final Collection<String> SCOPED_CACHE_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> SCOPED_CACHE_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> SCOPED_CACHE_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> SCOPED_CACHE_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
- private static final CacheRowKeySerializer ROWKEY_SERIALIZER = new CacheRowKeySerializer();
-
- private static final BucketScopedRowKeySerializer<String> BUCKET_ROWKEY_SERIALIZER =
- new BucketScopedRowKeySerializer<>( ROWKEY_SERIALIZER );
-
- private static final Serializer<String> COLUMN_NAME_SERIALIZER = StringSerializer.get();
-
- private static final ObjectSerializer COLUMN_VALUE_SERIALIZER = ObjectSerializer.get();
-
- public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> SCOPED_CACHE
- = new MultiTenantColumnFamily<>( "SCOPED_CACHE",
- BUCKET_ROWKEY_SERIALIZER, COLUMN_NAME_SERIALIZER, COLUMN_VALUE_SERIALIZER );
/** Number of buckets to hash across */
private static final int[] NUM_BUCKETS = {20};
/** How to funnel keys for buckets */
- private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
+ private static final Funnel<String> MAP_KEY_FUNNEL =
+ (Funnel<String>) (key, into) -> into.putString(key, StringHashUtils.UTF8);
- @Override
- public void funnel( final String key, final PrimitiveSink into ) {
- into.putString(key, StringHashUtils.UTF8);
- }
- };
-
- /**
- * Locator to get us all buckets
- */
+ /** Locator to get us all buckets */
private static final ExpandingShardLocator<String>
BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
- private final Keyspace keyspace;
+ private final Session session;
+ private final CassandraConfig cassandraConfig;
private final ObjectMapper MAPPER = new ObjectMapper();
- //------------------------------------------------------------------------------------------
+
@Inject
- public ScopedCacheSerializationImpl( final Keyspace keyspace ) {
- this.keyspace = keyspace;
- //MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ public ScopedCacheSerializationImpl( final Session session,
+ final CassandraConfig cassandraConfig ) {
+ this.session = session;
+ this.cassandraConfig = cassandraConfig;
+
MAPPER.enableDefaultTyping();
MAPPER.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
@@ -113,77 +101,76 @@
@Override
public V readValue(CacheScope scope, K key, TypeReference typeRef ) {
+ return readValueCQL( scope, key, typeRef);
+
+ }
+
+
+ private V readValueCQL(CacheScope scope, K key, TypeReference typeRef){
+
Preconditions.checkNotNull(scope, "scope is required");
Preconditions.checkNotNull(key, "key is required");
- // determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
- try {
- try {
- Column<String> result = keyspace.prepareQuery(SCOPED_CACHE)
- .getKey(keyRowKey).getColumn( columnName ).execute().getResult();
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+ final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
- result.getByteBufferValue();
- //V value = MAPPER.readValue(result.getByteArrayValue(), new TypeReference<V>() {});
- V value = MAPPER.readValue(result.getByteArrayValue(), typeRef);
+ final Statement statement = QueryBuilder.select().all().from(SCOPED_CACHE_TABLE)
+ .where(inKey)
+ .and(inColumn)
+ .setConsistencyLevel(cassandraConfig.getDataStaxReadCl());
- logger.debug("Read cache item from scope {}\n key/value types {}/{}\n key:value: {}:{}",
- scope.getApplication().getUuid(),
- key.getClass().getSimpleName(),
- value.getClass().getSimpleName(),
- key,
- value);
+ final ResultSet resultSet = session.execute(statement);
+ final com.datastax.driver.core.Row row = resultSet.one();
- return value;
+ if (row == null){
- } catch (NotFoundException nfe) {
- if(logger.isDebugEnabled()) {
- logger.debug("Value not found");
- }
-
- } catch (IOException ioe) {
- logger.error("Unable to read cached value", ioe);
- throw new RuntimeException("Unable to read cached value", ioe);
+ if(logger.isDebugEnabled()){
+ logger.debug("Cache value not found for key {}", key );
}
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
+ return null;
}
- if(logger.isDebugEnabled()){
- logger.debug("Cache value not found for key {}", key );
+ try {
+
+ return MAPPER.readValue(row.getBytes("value").array(), typeRef);
+
+ } catch (IOException ioe) {
+ logger.error("Unable to read cached value", ioe);
+ throw new RuntimeException("Unable to read cached value", ioe);
}
- return null;
+
}
@Override
public V writeValue(CacheScope scope, K key, V value, Integer ttl) {
+ return writeValueCQL( scope, key, value, ttl);
+
+ }
+
+ private V writeValueCQL(CacheScope scope, K key, V value, Integer ttl) {
+
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required");
Preconditions.checkNotNull( ttl, "ttl is required");
- // determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
// serialize cache item
byte[] cacheBytes;
@@ -193,121 +180,156 @@
throw new RuntimeException("Unable to serialize cache value", jpe);
}
- // serialize to the entry
- final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow(SCOPED_CACHE, keyRowKey).putColumn(columnName, cacheBytes, ttl);
+ final Using timeToLive = QueryBuilder.ttl(ttl);
- executeBatch(batch);
+
+ // convert to ByteBuffer for the blob DataType in Cassandra
+ final ByteBuffer bb = ByteBuffer.allocate(cacheBytes.length);
+ bb.put(cacheBytes);
+ bb.flip();
+
+ final Statement cacheEntry = QueryBuilder.insertInto(SCOPED_CACHE_TABLE)
+ .using(timeToLive)
+ .value("key", getPartitionKey(scope, rowKeyString, bucket))
+ .value("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", bb);
+
+
+ session.execute(cacheEntry);
logger.debug("Wrote cache item to scope {}\n key/value types {}/{}\n key:value: {}:{}",
- scope.getApplication().getUuid(),
- key.getClass().getSimpleName(),
- value.getClass().getSimpleName(),
- key,
- value);
+ scope.getApplication().getUuid(),
+ key.getClass().getSimpleName(),
+ value.getClass().getSimpleName(),
+ key,
+ value);
return value;
+
}
+
@Override
public void removeValue(CacheScope scope, K key) {
+ removeValueCQL(scope, key);
+
+ }
+
+
+ private void removeValueCQL(CacheScope scope, K key) {
+
Preconditions.checkNotNull( scope, "scope is required");
Preconditions.checkNotNull( key, "key is required" );
// determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
-
// determine column name based on K key to string
- String columnName = key.toString();
+ final String columnName = key.toString();
- final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow(SCOPED_CACHE, keyRowKey).deleteColumn(columnName);
- executeBatch(batch);
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
+ final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) );
+
+ final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+ .where(inKey)
+ .and(inColumn);
+
+ session.execute(statement);
+
}
@Override
public void invalidate(CacheScope scope) {
+ invalidateCQL(scope);
+ logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
+
+ }
+
+ private void invalidateCQL(CacheScope scope){
+
Preconditions.checkNotNull(scope, "scope is required");
// determine bucketed row-key based application UUID
- String rowKeyString = scope.getApplication().getUuid().toString();
+ final String rowKeyString = scope.getApplication().getUuid().toString();
final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString);
- final BucketScopedRowKey<String> keyRowKey =
- BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket);
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) );
- batch.withRow(SCOPED_CACHE, keyRowKey).delete();
+ final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE)
+ .where(inKey);
- final OperationResult<Void> result = executeBatch(batch);
+ session.execute(statement);
- logger.debug("Invalidated scope {}", scope.getApplication().getUuid());
}
-
- private class MutationBatchExec implements Callable<Void> {
- private final MutationBatch myBatch;
- private MutationBatchExec(MutationBatch batch) {
- myBatch = batch;
- }
- @Override
- public Void call() throws Exception {
- myBatch.execute();
- return null;
- }
- }
-
-
- private OperationResult<Void> executeBatch(MutationBatch batch) {
- try {
- return batch.execute();
-
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to connect to cassandra", e);
- }
- }
-
-
- //------------------------------------------------------------------------------------------
-
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTenantColumnFamilyDefinition scopedCache =
- new MultiTenantColumnFamilyDefinition( SCOPED_CACHE,
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
- return Arrays.asList(scopedCache);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ final TableDefinition scopedCache =
+ new TableDefinition( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS,
+ SCOPED_CACHE_COLUMNS, TableDefinition.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER);
+
+ return Collections.singletonList(scopedCache);
}
- /**
- * Inner class to serialize cache key
- */
- private static class CacheRowKeySerializer implements CompositeFieldSerializer<String> {
- @Override
- public void toComposite( final CompositeBuilder builder, final String key ) {
- builder.addString(key);
- }
+ private ByteBuffer getPartitionKey(CacheScope scope, String key, int bucketNumber){
- @Override
- public String fromComposite( final CompositeParser composite ) {
- final String key = composite.readString();
- return key;
+ return serializeKeys(scope.getApplication().getUuid(),
+ scope.getApplication().getType(), bucketNumber, key);
+
+ }
+
+ private static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, int bucketNumber, String rowKeyString ){
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(0, ownerUUID);
+ keys.add(1, ownerType);
+ keys.add(2, bucketNumber);
+ keys.add(3, rowKeyString);
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16+ownerType.length()+rowKeyString.length();
+
+ // ints are 4 bytes, add for the bucket
+ size += 4;
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
}
+ stuff.flip();
+ return stuff.duplicate();
+
}
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fcaa51d..9f74927 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -19,6 +19,8 @@
package org.apache.usergrid.persistence.collection.impl;
+
+import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -40,7 +42,7 @@
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -69,6 +71,7 @@
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
+ private final Session session;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
private final ActorSystemManager actorSystemManager;
@@ -104,7 +107,8 @@
actorSystemManager,
uniqueValuesService,
cassandraConfig,
- scope );
+ scope,
+ session);
return target;
}
@@ -132,7 +136,8 @@
final RxTaskScheduler rxTaskScheduler,
final ActorSystemManager actorSystemManager,
final UniqueValuesService uniqueValuesService,
- final CassandraConfig cassandraConfig ) {
+ final CassandraConfig cassandraConfig,
+ final Session session ) {
this.writeStart = writeStart;
this.writeVerifyUnique = writeVerifyUnique;
@@ -153,6 +158,8 @@
this.actorSystemManager = actorSystemManager;
this.uniqueValuesService = uniqueValuesService;
this.cassandraConfig = cassandraConfig;
+ this.session = session;
+
}
@Override
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 9dce7ef..a61e744 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.collection.impl;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -44,7 +47,7 @@
import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator;
import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
@@ -56,10 +59,10 @@
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
@@ -95,6 +98,7 @@
private final Keyspace keyspace;
+ private final Session session;
private final Timer writeTimer;
private final Timer deleteTimer;
private final Timer fieldIdTimer;
@@ -132,7 +136,8 @@
final ActorSystemManager actorSystemManager,
final UniqueValuesService uniqueValuesService,
final CassandraConfig cassandraConfig,
- @Assisted final ApplicationScope applicationScope ) {
+ @Assisted final ApplicationScope applicationScope,
+ final Session session ) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
@@ -157,6 +162,7 @@
this.markCommit = markCommit;
this.keyspace = keyspace;
+ this.session = session;
this.applicationScope = applicationScope;
@@ -309,15 +315,11 @@
public Observable<Id> getIdField( final String type, final Field field ) {
final List<Field> fields = Collections.singletonList( field );
final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
- try {
- final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
- final UniqueValue value = set.getValue( field1.getName() );
- return value == null ? null : value.getEntityId();
- }
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
- }
+
+ final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
+ final UniqueValue value = set.getValue( field1.getName() );
+ return value == null ? null : value.getEntityId();
+
} );
return ObservableTimer.time( idObservable, fieldIdTimer );
@@ -331,15 +333,14 @@
public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields,
boolean uniqueIndexRepair) {
final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> {
- try {
- final UUID startTime = UUIDGenerator.newTimeUUID();
+ final UUID startTime = UUIDGenerator.newTimeUUID();
//Get back set of unique values that correspond to collection of fields
//Purposely use string consistency as it's extremely important here, regardless of performance
UniqueValueSet set =
uniqueValueSerializationStrategy
- .load( applicationScope, cassandraConfig.getConsistentReadCL(), type, fields1 , uniqueIndexRepair);
+ .load( applicationScope, cassandraConfig.getDataStaxReadConsistentCl(), type, fields1 , uniqueIndexRepair);
//Short circuit if we don't have any uniqueValues from the given fields.
if ( !set.iterator().hasNext() ) {
@@ -360,73 +361,75 @@
return new MutableFieldSet( 0 );
}
+ //Short circuit if we don't have any uniqueValues from the given fields.
+ if ( !set.iterator().hasNext() ) {
+ return new MutableFieldSet( 0 );
+ }
- //loop through each field, and construct an entity load
- List<Id> entityIds = new ArrayList<>( fields1.size() );
- List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
- for ( final Field expectedField : fields1 ) {
+ //loop through each field, and construct an entity load
+ List<Id> entityIds = new ArrayList<>( fields1.size() );
+ List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
- UniqueValue value = set.getValue( expectedField.getName() );
+ for ( final Field expectedField : fields1 ) {
- if ( value == null ) {
- logger.debug( "Field does not correspond to a unique value" );
- }
+ UniqueValue value = set.getValue( expectedField.getName() );
- entityIds.add( value.getEntityId() );
- uniqueValues.add( value );
+ if ( value == null ) {
+ logger.debug( "Field does not correspond to a unique value" );
}
- //Load a entity for each entityId we retrieved.
- final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
+ entityIds.add( value.getEntityId() );
+ uniqueValues.add( value );
+ }
- //now loop through and ensure the entities are there.
- final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+ //Load a entity for each entityId we retrieved.
+ final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
- final MutableFieldSet response = new MutableFieldSet( fields1.size() );
+ final BatchStatement uniqueDeleteBatch = new BatchStatement();
- for ( final UniqueValue expectedUnique : uniqueValues ) {
- final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
- //bad unique value, delete this, it's inconsistent
- if ( entity == null || !entity.getEntity().isPresent() ) {
+ final MutableFieldSet response = new MutableFieldSet( fields1.size() );
- if(logger.isTraceEnabled()) {
- logger.trace("Unique value [{}={}] does not have corresponding entity [{}], executing " +
+ for ( final UniqueValue expectedUnique : uniqueValues ) {
+ final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
+
+ //bad unique value, delete this, it's inconsistent
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Unique value [{}={}] does not have corresponding entity [{}], executing " +
"read repair to remove stale unique value entry",
- expectedUnique.getField().getName(),
- expectedUnique.getField().getValue().toString(),
- expectedUnique.getEntityId()
- );
- }
-
- final MutationBatch valueDelete =
- uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
-
- deleteBatch.mergeShallow( valueDelete );
- continue;
+ expectedUnique.getField().getName(),
+ expectedUnique.getField().getValue().toString(),
+ expectedUnique.getEntityId()
+ );
}
- //TODO, we need to validate the property in the entity matches the property in the unique value
-
-
- //else add it to our result set
- response.addEntity( expectedUnique.getField(), entity );
+ uniqueDeleteBatch.add(
+ uniqueValueSerializationStrategy.deleteCQL( applicationScope, expectedUnique ));
+ continue;
}
- if ( deleteBatch.getRowCount() > 0 ) {
+ //TODO, we need to validate the property in the entity matches the property in the unique value
- response.setEntityRepairExecuted(true);
- deleteBatch.execute();
- }
- return response;
+ //else add it to our result set
+ response.addEntity( expectedUnique.getField(), entity );
}
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
+
+
+ if ( uniqueDeleteBatch.getStatements().size() > 0 ) {
+
+ response.setEntityRepairExecuted(true);
+ //TODO: explore making this an Async process
+ session.execute(uniqueDeleteBatch);
}
+
+
+ return response;
+
} );
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 8aa5cfc..9f2b994 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.UUID;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final Keyspace keyspace;
+ private final Session session;
private final SerializationFig serializationFig;
@@ -70,12 +73,14 @@
@Inject
public UniqueCleanup( final SerializationFig serializationFig,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+ final Keyspace keyspace, final MetricsFactory metricsFactory,
+ final Session session ) {
this.serializationFig = serializationFig;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.keyspace = keyspace;
this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" );
+ this.session = session;
}
@@ -127,22 +132,20 @@
//roll them up
.doOnNext( uniqueValues -> {
- final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+ final BatchStatement uniqueCleanupBatch = new BatchStatement();
for ( UniqueValue value : uniqueValues ) {
logger
.debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
uniqueCleanupBatch
- .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+ .add( uniqueValueSerializationStrategy.deleteCQL( applicationScope, value ) );
}
- try {
- uniqueCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
+
+ session.execute(uniqueCleanupBatch);
+
} ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer );
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index 23c6dfe..e5c4c96 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,14 +53,17 @@
private final UniqueValueSerializationStrategy uniqueValueStrat;
private final MvccLogEntrySerializationStrategy logEntryStrat;
+ private final Session session;
@Inject
- public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat,
- UniqueValueSerializationStrategy uniqueValueStrat ) {
+ public RollbackAction( final MvccLogEntrySerializationStrategy logEntryStrat,
+ final UniqueValueSerializationStrategy uniqueValueStrat,
+ final Session session ) {
this.uniqueValueStrat = uniqueValueStrat;
this.logEntryStrat = logEntryStrat;
+ this.session = session;
}
@@ -72,6 +77,7 @@
// one batch to handle rollback
MutationBatch rollbackMb = null;
+ final BatchStatement uniqueDeleteBatch = new BatchStatement();
final Optional<Entity> entity = mvccEntity.getEntity();
if ( entity.isPresent() ) {
@@ -83,45 +89,17 @@
UniqueValue toDelete =
new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() );
- MutationBatch deleteMb = uniqueValueStrat.delete(scope, toDelete );
+ uniqueDeleteBatch.add(uniqueValueStrat.deleteCQL(scope, toDelete ));
- if ( rollbackMb == null ) {
- rollbackMb = deleteMb;
- }
- else {
- rollbackMb.mergeShallow( deleteMb );
- }
}
}
-
- if ( rollbackMb != null ) {
- try {
- rollbackMb.execute();
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException( "Error rolling back changes", ex );
- }
- }
+ // execute the batch statements for deleting unique field entries
+ session.execute(uniqueDeleteBatch);
logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() );
}
}
}
-
- class FieldDeleteResult {
-
- private final String name;
-
-
- public FieldDeleteResult( String name ) {
- this.name = name;
- }
-
-
- public String getName() {
- return this.name;
- }
- }
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 5b98ca5..fc1382f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -22,11 +22,16 @@
import java.util.Map;
import java.util.UUID;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +83,8 @@
private final MvccEntitySerializationStrategy entityStrat;
+ private final Session session;
+
@Inject
public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
@@ -85,7 +92,9 @@
final UniqueValueSerializationStrategy uniqueValueStrat,
final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
- final UniqueValuesService akkaUvService ) {
+ final UniqueValuesService akkaUvService,
+ final Session session ) {
+
Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" );
@@ -97,6 +106,8 @@
this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService;
+ this.session = session;
+
}
@@ -125,6 +136,8 @@
final MvccLogEntry startEntry =
new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+
+
MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry );
// now get our actual insert into the entity data
@@ -166,20 +179,22 @@
final Entity entity = mvccEntity.getEntity().get();
// re-write the unique values but this time with no TTL
+ final BatchStatement uniqueBatch = new BatchStatement();
+
for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) {
UniqueValue written = new UniqueValueImpl( field, entity.getId(), version);
- MutationBatch mb = uniqueValueStrat.write(scope, written );
+ uniqueBatch.add(uniqueValueStrat.writeCQL(scope, written, -1 ));
logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString());
- // merge into our existing mutation batch
- logMutation.mergeShallow( mb );
+
}
try {
logMutation.execute();
+ session.execute(uniqueBatch);
}
catch ( ConnectionException e ) {
logger.error( "Failed to execute write asynchronously ", e );
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index a3565ea..e7dbb10 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -18,16 +18,20 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.netflix.hystrix.HystrixCommandProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -39,17 +43,16 @@
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import rx.functions.Action1;
import java.util.*;
@@ -63,19 +66,22 @@
private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class );
- ActorSystemFig actorSystemFig;
- UniqueValuesFig uniqueValuesFig;
- UniqueValuesService akkaUvService;
+ private ActorSystemFig actorSystemFig;
+ private UniqueValuesFig uniqueValuesFig;
+ private UniqueValuesService akkaUvService;
private final UniqueValueSerializationStrategy uniqueValueStrat;
- public static int uniqueVerifyPoolSize = 100;
+ private static int uniqueVerifyPoolSize = 100;
private static int uniqueVerifyTimeoutMillis= 5000;
protected final SerializationFig serializationFig;
protected final Keyspace keyspace;
+
+ protected final Session session;
+
private final CassandraConfig cassandraFig;
@@ -86,13 +92,15 @@
final CassandraConfig cassandraFig,
final ActorSystemFig actorSystemFig,
final UniqueValuesFig uniqueValuesFig,
- final UniqueValuesService akkaUvService ) {
+ final UniqueValuesService akkaUvService,
+ final Session session ) {
this.keyspace = keyspace;
this.cassandraFig = cassandraFig;
this.actorSystemFig = actorSystemFig;
this.uniqueValuesFig = uniqueValuesFig;
this.akkaUvService = akkaUvService;
+ this.session = session;
Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -150,7 +158,7 @@
final ApplicationScope scope = ioevent.getEntityCollection();
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final BatchStatement batch = new BatchStatement();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
@@ -168,48 +176,42 @@
// use write-first then read strategy
final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
- try {
- // don't use read repair on this pre-write check
- // stronger consistency is extremely important here, more so than performance
- UniqueValueSet set = uniqueValueStrat.load(scope, cassandraFig.getConsistentReadCL(),
- written.getEntityId().getType(), Collections.singletonList(written.getField()), false);
+ // don't use read repair on this pre-write check
+ // stronger consistency is extremely important here, more so than performance
+ UniqueValueSet set = uniqueValueStrat.load(scope, cassandraFig.getDataStaxReadConsistentCl(),
+ written.getEntityId().getType(), Collections.singletonList(written.getField()), false);
- set.forEach(uniqueValue -> {
+ set.forEach(uniqueValue -> {
- if(!uniqueValue.getEntityId().getUuid().equals(written.getEntityId().getUuid())){
+ if(!uniqueValue.getEntityId().getUuid().equals(written.getEntityId().getUuid())){
- if(logger.isTraceEnabled()){
- logger.trace("Pre-write violation detected. Attempted write for unique value [{}={}] and " +
- "entity id [{}], entity version [{}] conflicts with already existing entity id [{}], " +
- "entity version [{}]",
- written.getField().getName(),
- written.getField().getValue().toString(),
- written.getEntityId().getUuid(),
- written.getEntityVersion(),
- uniqueValue.getEntityId().getUuid(),
- uniqueValue.getEntityVersion());
- }
-
- preWriteUniquenessViolations.put(field.getName(), field);
-
+ if(logger.isTraceEnabled()){
+ logger.trace("Pre-write violation detected. Attempted write for unique value [{}={}] and " +
+ "entity id [{}], entity version [{}] conflicts with already existing entity id [{}], " +
+ "entity version [{}]",
+ written.getField().getName(),
+ written.getField().getValue().toString(),
+ written.getEntityId().getUuid(),
+ written.getEntityVersion(),
+ uniqueValue.getEntityId().getUuid(),
+ uniqueValue.getEntityVersion());
}
- });
+ preWriteUniquenessViolations.put(field.getName(), field);
+
+ }
+
+ });
- } catch (ConnectionException e) {
-
- throw new RuntimeException("Error connecting to cassandra", e);
- }
// only build the batch statement if we don't have a violation for the field
if( preWriteUniquenessViolations.get(field.getName()) == null) {
// use TTL in case something goes wrong before entity is finally committed
- final MutationBatch mb = uniqueValueStrat.write(scope, written, serializationFig.getTimeout());
+ batch.add(uniqueValueStrat.writeCQL(scope, written, serializationFig.getTimeout()));
- batch.mergeShallow(mb);
uniqueFields.add(field);
}
}
@@ -228,12 +230,8 @@
}
//perform the write
- try {
- batch.execute();
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException( "Unable to write to cassandra", ex );
- }
+ session.execute(batch);
+
// use simple thread pool to verify fields in parallel
ConsistentReplayCommand cmd = new ConsistentReplayCommand(
@@ -273,29 +271,26 @@
@Override
protected Map<String, Field> run() throws Exception {
- return executeStrategy(fig.getReadCL());
+ return executeStrategy(fig.getDataStaxReadCl());
}
@Override
protected Map<String, Field> getFallback() {
- // fallback with same CL as there are many reasons the 1st execution failed,
- // not just due to consistency problems
- return executeStrategy(fig.getReadCL());
+ // fallback with same CL as there are many reasons the 1st execution failed, not just due to consistency problems
+ return executeStrategy(fig.getDataStaxReadCl());
+
}
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
- //allocate our max size, worst case
- //now get the set of fields back
+
final UniqueValueSet uniqueValues;
- try {
- // load ascending for verification to make sure we wrote is the last read back
- // don't read repair on this read because our write-first strategy will introduce a duplicate
- uniqueValues =
- uniqueValueSerializationStrategy.load( scope, consistencyLevel, type, uniqueFields, false);
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to read from cassandra", e );
- }
+
+ // load ascending for verification to make sure we wrote is the last read back
+ // don't read repair on this read because our write-first strategy will introduce a duplicate
+ uniqueValues =
+ uniqueValueSerializationStrategy.load( scope, consistencyLevel, type, uniqueFields, false);
+
+
final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index c6c70b9..cb6cd2b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -21,16 +21,14 @@
import java.util.Collection;
import java.util.Iterator;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
import org.apache.usergrid.persistence.core.migration.data.VersionedData;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.ConsistencyLevel;
-
/**
* Reads and writes to UniqueValues column family.
@@ -39,24 +37,15 @@
/**
- * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
- *
- * @param applicationScope scope
- * @param uniqueValue Object to be written
- *
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
- */
- MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue );
-
- /**
- * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
+ * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. -1 is the same as no ttl
+ * (lives forever)
*
* @param applicationScope scope
* @param uniqueValue Object to be written
* @param timeToLive How long object should live in seconds. -1 implies store forever
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+ * @return BatchStatement that encapsulates CQL statements, caller may or may not execute.
*/
- MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
+ BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
/**
* Load UniqueValue that matches field from collection or null if that value does not exist. Returns the oldest
@@ -68,10 +57,9 @@
*
* @return UniqueValueSet containing fields from the collection that exist in cassandra
*
- * @throws ConnectionException on error connecting to Cassandra
*/
- UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields )
- throws ConnectionException;
+ UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields );
+
/**
* Load UniqueValue that matches field from collection or null if that value does not exist. Returns the oldest
@@ -83,10 +71,9 @@
*
* @return UniqueValueSet containing fields from the collection that exist in cassandra
*
- * @throws ConnectionException on error connecting to Cassandra
*/
UniqueValueSet load( ApplicationScope applicationScope, String type, Collection<Field> fields,
- boolean useReadRepair ) throws ConnectionException;
+ boolean useReadRepair );
/**
* Load UniqueValue that matches field from collection or null if that value does not exist.
@@ -95,13 +82,13 @@
* @param consistencyLevel Consistency level of query
* @param type The type the unique value exists within
* @param fields Field name/value to search for
+ * @return UniqueValueSet containing fields from the collection that exist in cassandra
+ *
* @param useReadRepair
* @return UniqueValueSet containing fields from the collection that exist in cassandra
- * @throws ConnectionException on error connecting to Cassandra
*/
UniqueValueSet load(ApplicationScope applicationScope, ConsistencyLevel consistencyLevel, String type,
- Collection<Field> fields, boolean useReadRepair) throws ConnectionException;
-
+ Collection<Field> fields, boolean useReadRepair);
/**
* Loads the currently persisted history of every unique value the entity has held. This will
@@ -119,9 +106,9 @@
*
* @param applicationScope The scope of the application
* @param uniqueValue Object to be deleted.
- * @return MutatationBatch that encapsulates operation, caller may or may not execute.
+ * @return BatchStatement that encapsulates the CQL statements, caller may or may not execute.
*/
- MutationBatch delete( ApplicationScope applicationScope, UniqueValue uniqueValue );
+ BatchStatement deleteCQL( ApplicationScope applicationScope, UniqueValue uniqueValue );
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java
new file mode 100644
index 0000000..ed210e9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+
+import java.util.Iterator;
+
+
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
index 274cf5d..d451adc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java
@@ -42,7 +42,8 @@
return entityVersion;
}
- public boolean equals( Object o ) {
+ @Override
+ public boolean equals( final Object o ) {
if ( o == null || !(o instanceof EntityVersion) ) {
return false;
@@ -60,5 +61,12 @@
return true;
}
-
+
+ @Override
+ public int hashCode() {
+ int result = entityId.hashCode();
+ result = 31 * result + entityVersion.hashCode();
+ return result;
+ }
+
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index 57199e2..2d7892a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.UUID;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
@@ -342,6 +343,12 @@
return Collections.singleton( cf );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
/**
* Do the write on the correct row for the entity id with the operation
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
index 1fe342e..19735cc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -27,6 +27,7 @@
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
@@ -186,6 +187,11 @@
return Collections.emptyList();
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.emptyList();
+ }
+
@Override
public int getImplementationVersion() {
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
index 9e29e9c..ee548e3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -26,7 +26,7 @@
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
index 13d9613..e9b0781 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -29,7 +29,7 @@
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.FieldBuffer;
import org.apache.usergrid.persistence.core.astyanax.FieldBufferBuilder;
import org.apache.usergrid.persistence.core.astyanax.FieldBufferParser;
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index 57607d0..ecd0f5c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -16,6 +16,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.netflix.astyanax.serializers.StringSerializer;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,7 @@
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
@@ -304,6 +305,12 @@
return Collections.singleton( cf );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
/**
* Do the write on the correct row for the entity id with the operation
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
index 8e34f01..b27651d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationProxyImpl.java
@@ -30,6 +30,7 @@
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
@@ -146,6 +147,11 @@
return Collections.emptyList();
}
+ @Override
+ public Collection<TableDefinition> getTables(){
+ return Collections.emptyList();
+ }
+
@Override
public int getImplementationVersion() {
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV1Impl.java
index e0c0909..62c35bb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV1Impl.java
@@ -35,6 +35,7 @@
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
@@ -115,4 +116,10 @@
return Collections.singleton( cf );
}
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV2Impl.java
index 1bfc289..325762d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyV2Impl.java
@@ -35,6 +35,7 @@
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
@@ -103,4 +104,10 @@
return Collections.singleton( cf );
}
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidation.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidation.java
index 6715cfb..0f3f6b9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidation.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidation.java
@@ -21,7 +21,7 @@
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 8c1f2d2..0753281 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -18,39 +18,37 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
+
+import java.nio.ByteBuffer;
import java.util.*;
-import com.netflix.astyanax.util.RangeBuilder;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.marshal.BytesType;
-
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
import com.google.common.base.Preconditions;
+
import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
-import com.netflix.astyanax.query.RowQuery;
+
/**
@@ -61,13 +59,15 @@
private static final Logger logger = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
-
- private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion>
- CF_UNIQUE_VALUES;
+ public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)";
- private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry>
- CF_ENTITY_UNIQUE_VALUE_LOG ;
+ private final String TABLE_UNIQUE_VALUES;
+ private final String TABLE_UNIQUE_VALUES_LOG;
+
+ private final Map COLUMNS_UNIQUE_VALUES;
+ private final Map COLUMNS_UNIQUE_VALUES_LOG;
+
public static final int COL_VALUE = 0x0;
@@ -75,33 +75,51 @@
private final SerializationFig serializationFig;
- protected final Keyspace keyspace;
private final CassandraFig cassandraFig;
+ private final Session session;
+ private final CassandraConfig cassandraConfig;
+
/**
* Construct serialization strategy for keyspace.
*
- * @param keyspace Keyspace in which to store Unique Values.
* @param cassandraFig The cassandra configuration
* @param serializationFig The serialization configuration
*/
- public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
- this.keyspace = keyspace;
+ public UniqueValueSerializationStrategyImpl( final CassandraFig cassandraFig,
+ final SerializationFig serializationFig,
+ final Session session,
+ final CassandraConfig cassandraConfig) {
this.cassandraFig = cassandraFig;
this.serializationFig = serializationFig;
- CF_UNIQUE_VALUES = getUniqueValuesCF();
- CF_ENTITY_UNIQUE_VALUE_LOG = getEntityUniqueLogCF();
+ this.session = session;
+ this.cassandraConfig = cassandraConfig;
+
+ TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName();
+ TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName();
+
+ COLUMNS_UNIQUE_VALUES = getUniqueValuesTable().getColumns();
+ COLUMNS_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getColumns();
+
}
-
- public MutationBatch write( final ApplicationScope collectionScope, UniqueValue value ) {
+ @Override
+ public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value,
+ final int timeToLive ){
Preconditions.checkNotNull( value, "value is required" );
+ BatchStatement batch = new BatchStatement();
+
+ Using ttl = null;
+ if(timeToLive > 0){
+
+ ttl = QueryBuilder.ttl(timeToLive);
+
+ }
final Id entityId = value.getEntityId();
final UUID entityVersion = value.getEntityVersion();
@@ -110,66 +128,58 @@
ValidationUtils.verifyIdentity( entityId );
ValidationUtils.verifyVersion( entityVersion );
-
final EntityVersion ev = new EntityVersion( entityId, entityVersion );
final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
- return doWrite( collectionScope, value, new RowOp() {
+ ByteBuffer partitionKey = getPartitionKey(collectionScope.getApplication(), value.getEntityId().getType(),
+ field.getTypeName().toString(), field.getName(), field.getValue());
- @Override
- public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.putColumn( ev, COL_VALUE );
- }
+ ByteBuffer logPartitionKey = getLogPartitionKey(collectionScope.getApplication(), value.getEntityId());
- @Override
- public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
- colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
- }
- } );
+ if(ttl != null) {
+
+ Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+ .value("key", partitionKey)
+ .value("column1", serializeUniqueValueColumn(ev))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED))
+ .using(ttl);
+
+ batch.add(uniqueValueStatement);
+
+
+ }else{
+
+ Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+ .value("key", partitionKey)
+ .value("column1", serializeUniqueValueColumn(ev))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+ batch.add(uniqueValueStatement);
+
+ }
+
+ // we always want to retain the log entry, so never write with the TTL
+ Statement uniqueValueLogStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES_LOG)
+ .value("key", logPartitionKey)
+ .value("column1", serializeUniqueValueLogColumn(uniqueFieldEntry))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+ batch.add(uniqueValueLogStatement);
+
+
+
+ return batch;
+
}
@Override
- public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value,
- final int timeToLive ) {
-
- Preconditions.checkNotNull( value, "value is required" );
- Preconditions.checkArgument( timeToLive > 0, "timeToLive must be greater than 0 is required" );
-
- final Id entityId = value.getEntityId();
- final UUID entityVersion = value.getEntityVersion();
- final Field<?> field = value.getField();
-
- ValidationUtils.verifyIdentity( entityId );
- ValidationUtils.verifyVersion( entityVersion );
-
- final EntityVersion ev = new EntityVersion( entityId, entityVersion );
- final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
-
- return doWrite( collectionScope, value, new RowOp() {
-
- @Override
- public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.putColumn( ev, COL_VALUE, timeToLive );
- }
-
-
- //we purposefully leave out TTL. Worst case we issue deletes against tombstoned columns
- //best case, we clean up an invalid secondary index entry when the logger is used
- @Override
- public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
- colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
- }
- } );
- }
-
-
- @Override
- public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) {
+ public BatchStatement deleteCQL( final ApplicationScope scope, UniqueValue value){
Preconditions.checkNotNull( value, "value is required" );
+ final BatchStatement batch = new BatchStatement();
final Id entityId = value.getEntityId();
final UUID entityVersion = value.getEntityVersion();
@@ -182,147 +192,183 @@
final EntityVersion ev = new EntityVersion( entityId, entityVersion );
final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
- return doWrite( scope, value, new RowOp() {
- @Override
- public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.deleteColumn( ev );
- }
+ ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(),
+ value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue());
+
+ ByteBuffer columnValue = serializeUniqueValueColumn(ev);
+
+ final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey );
+ final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue );
+ Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn);
+ batch.add(uniqueDelete);
- @Override
- public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
- colMutation.deleteColumn( uniqueFieldEntry );
- }
- } );
- }
+
+ ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId);
+ ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry);
- /**
- * Do the column update or delete for the given column and row key
- *
- * @param applicationScope We need to use this when getting the keyspace
- * @param uniqueValue The unique value to write
- * @param op The operation to write
- */
- private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey );
+ final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue );
- final Id applicationId = applicationScope.getApplication();
+ Statement uniqueLogDelete = QueryBuilder.delete()
+ .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn);
- final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() );
-
-
- op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) );
-
-
- final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() );
-
- op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG,
- ScopedRowKey.fromKey( applicationId, entityKey ) ) );
+ batch.add(uniqueLogDelete);
if ( logger.isTraceEnabled() ) {
logger.trace( "Building batch statement for unique value entity={} version={} name={} value={} ",
- uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion(),
- uniqueValue.getField().getName(), uniqueValue.getField().getValue() );
+ value.getEntityId().getUuid(), value.getEntityVersion(),
+ value.getField().getName(), value.getField().getValue() );
}
+
return batch;
}
@Override
- public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields )
- throws ConnectionException {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, false);
+ public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) {
+
+ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, false );
+
}
@Override
public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields,
- boolean useReadRepair)
- throws ConnectionException {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, useReadRepair);
+ boolean useReadRepair) {
+
+ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, useReadRepair);
+
}
@Override
- public UniqueValueSet load(final ApplicationScope appScope, final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
+ public UniqueValueSet load( final ApplicationScope appScope,
+ final ConsistencyLevel consistencyLevel,
+ final String type, final Collection<Field> fields, boolean useReadRepair ) {
+
+
+ Preconditions.checkNotNull( fields, "fields are required" );
+ Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
+
+ return loadCQL(appScope, consistencyLevel, type, fields, useReadRepair);
+
+ }
+
+
+ private UniqueValueSet loadCQL( final ApplicationScope appScope,
+ final ConsistencyLevel consistencyLevel,
+ final String type, final Collection<Field> fields, boolean useReadRepair ) {
Preconditions.checkNotNull( fields, "fields are required" );
Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
- final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
-
final Id applicationId = appScope.getApplication();
- for ( Field field : fields ) {
-
- final FieldKey key = createUniqueValueKey( applicationId, type, field );
+ // row key = app UUID + app type + entityType + field type + field name + field value
- final ScopedRowKey<FieldKey> rowKey =
- ScopedRowKey.fromKey( applicationId, key );
- keys.add( rowKey );
- }
+
+ //List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
- Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
- keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
- .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build())
- .execute().getResult().iterator();
- if( !results.hasNext()){
- if(logger.isTraceEnabled()){
- logger.trace("No partitions returned for unique value lookup");
- }
- }
+ for ( Field field : fields ) {
+
+ //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())));
+
+ //partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
+
+ final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type,
+ field.getTypeName().toString(), field.getName(), field.getValue()) );
+
+ final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
+ .where(inKey)
+ .setConsistencyLevel(consistencyLevel);
+
+ final ResultSet resultSet = session.execute(statement);
- while ( results.hasNext() )
+ Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
- {
-
- final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
-
- final Field field = parseRowKey( unique.getKey() );
-
- final Iterator<Column<EntityVersion>> columnList = unique.getColumns().iterator();
-
- //sanity check, nothing to do, skip it
- if ( !columnList.hasNext() ) {
+ if( !results.hasNext()){
if(logger.isTraceEnabled()){
- logger.trace("No cells exist in partition for unique value [{}={}]",
- field.getName(), field.getValue().toString());
+ logger.trace("No rows returned for unique value lookup of field: {}", field);
}
- continue;
}
List<UniqueValue> candidates = new ArrayList<>();
- /**
- * While iterating the columns, a rule is being enforced to only EVER return the oldest UUID. This means
- * the UUID with the oldest timestamp ( it was the original entity written for the unique value ).
- *
- * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
- * entity's version and not the entity's timestamp itself.
- *
- * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
- * backing serialized entity data is left in tact in case a cleanup / audit is later needed.
- */
- while (columnList.hasNext()) {
+ while( results.hasNext() ){
- final EntityVersion entityVersion = columnList.next().getName();
+ final com.datastax.driver.core.Row unique = results.next();
+ ByteBuffer partitionKey = unique.getBytes("key");
+ ByteBuffer column = unique.getBytesUnsafe("column1");
+
+ List<Object> keyContents = deserializePartitionKey(partitionKey);
+ List<Object> columnContents = deserializeUniqueValueColumn(column);
+
+ FieldTypeName fieldType;
+ String name;
+ String value;
+ if(this instanceof UniqueValueSerializationStrategyV2Impl) {
+
+
+ fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
+ name = (String) keyContents.get(4);
+ value = (String) keyContents.get(5);
+
+ }else{
+
+
+ fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
+ name = (String) keyContents.get(6);
+ value = (String) keyContents.get(7);
+
+
+ }
+
+ Field returnedField = getField(name, value, fieldType);
+
+
+ final EntityVersion entityVersion = new EntityVersion(
+ new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
+// //sanity check, nothing to do, skip it
+// if ( !columnList.hasNext() ) {
+// if(logger.isTraceEnabled()){
+// logger.trace("No cells exist in partition for unique value [{}={}]",
+// field.getName(), field.getValue().toString());
+// }
+// continue;
+// }
+
+
+
+
+ /**
+ * While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field.
+ * This means the UUID with the oldest timestamp ( it was the original entity written for
+ * the unique value ).
+ *
+ * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
+ * entity's version and not the entity's timestamp itself.
+ *
+ * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
+ * backing serialized entity data is left in tact in case a cleanup / audit is later needed.
+ */
+
final UniqueValue uniqueValue =
- new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion());
+ new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion());
// set the initial candidate and move on
if (candidates.size() == 0) {
@@ -331,7 +377,7 @@
if (logger.isTraceEnabled()) {
logger.trace("First entry for unique value [{}={}] found for application [{}], adding " +
"entry with entity id [{}] and entity version [{}] to the candidate list and continuing",
- field.getName(), field.getValue().toString(), applicationId.getType(),
+ returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(),
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
}
@@ -343,7 +389,7 @@
// take only the first
if (logger.isTraceEnabled()) {
logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" +
- " of cell loop", field.getName(), field.getValue().toString());
+ " of cell loop", returnedField.getName(), returnedField.getValue().toString());
}
break;
@@ -357,7 +403,7 @@
// do nothing, only versions can be newer and we're not worried about newer versions of same entity
if (logger.isTraceEnabled()) {
logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]",
- field.getName(), field.getValue().toString(), uniqueValue.getEntityId().getUuid(),
+ returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(),
candidates.get(candidates.size() -1));
}
@@ -369,19 +415,12 @@
// delete the duplicate from the unique value index
candidates.forEach(candidate -> {
- try {
+ logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
+ "entry with entity id [{}] and entity version [{}]", returnedField.getName(),
+ returnedField.getValue().toString(), applicationId.getUuid(),
+ candidate.getEntityId().getUuid(), candidate.getEntityVersion());
- logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
- "entry with entity id [{}] and entity version [{}]", field.getName(),
- field.getValue().toString(), applicationId.getUuid(),
- candidate.getEntityId().getUuid(), candidate.getEntityVersion());
-
- delete(appScope, candidate).execute();
-
- } catch (ConnectionException e) {
- logger.error( "Unable to connect to cassandra during duplicate repair of [{}={}]",
- field.getName(), field.getValue().toString() );
- }
+ session.execute(deleteCQL(appScope, candidate));
});
@@ -390,7 +429,7 @@
if (logger.isTraceEnabled()) {
logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " +
- "entity version [{}]", field.getName(), field.getValue().toString(),
+ "entity version [{}]", returnedField.getName(), returnedField.getValue().toString(),
uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
}
@@ -402,147 +441,186 @@
} else {
logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " +
- "with entity id [{}] and entity version [{}].", field.getName(), field.getValue().toString(),
+ "with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(),
applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
// delete the duplicate from the unique value index
- delete(appScope, uniqueValue).execute();
+ session.execute(deleteCQL(appScope, uniqueValue));
}
}
+
}
- // take the last candidate ( should be the latest version) and add to the result set
-
- final UniqueValue returnValue = candidates.get(candidates.size() -1);
- if(logger.isTraceEnabled()){
- logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
- returnValue.getField().getName(), returnValue.getField().getValue().toString(),
- returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
+ if ( candidates.size() > 0 ) {
+ // take the last candidate ( should be the latest version) and add to the result set
+ final UniqueValue returnValue = candidates.get(candidates.size() - 1);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
+ returnValue.getField().getName(), returnValue.getField().getValue().toString(),
+ returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
+ }
+ uniqueValueSet.addValue(returnValue);
}
- uniqueValueSet.addValue(returnValue);
+
}
+
return uniqueValueSet;
+
}
+
+
@Override
public Iterator<UniqueValue> getAllUniqueFields( final ApplicationScope collectionScope, final Id entityId ) {
Preconditions.checkNotNull( collectionScope, "collectionScope is required" );
Preconditions.checkNotNull( entityId, "entity id is required" );
- final Id applicationId = collectionScope.getApplication();
+ Clause inKey = QueryBuilder.in("key", getLogPartitionKey(collectionScope.getApplication(), entityId));
- final EntityKey entityKey = createEntityUniqueLogKey( applicationId, entityId );
+ Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES_LOG)
+ .where(inKey);
+
+ return new AllUniqueFieldsIterator(session, statement, entityId);
- final ScopedRowKey<EntityKey> rowKey =
- ScopedRowKey.fromKey( applicationId, entityKey );
-
-
- RowQuery<ScopedRowKey<EntityKey>, UniqueFieldEntry> query =
- keyspace.prepareQuery( CF_ENTITY_UNIQUE_VALUE_LOG ).getKey( rowKey )
- .withColumnRange( ( UniqueFieldEntry ) null, null, false, serializationFig.getBufferSize() );
-
- return new ColumnNameIterator( query, new UniqueEntryParser( entityId ), false );
}
- /**
- * Simple callback to perform puts and deletes with a common row setup code
- */
- private interface RowOp {
-
- /**
- * Execute the mutation into the lookup CF_UNIQUE_VALUES row
- */
- void doLookup( ColumnListMutation<EntityVersion> colMutation );
-
- /**
- * Execute the mutation into the lCF_ENTITY_UNIQUE_VALUESLUE row
- */
- void doLog( ColumnListMutation<UniqueFieldEntry> colMutation );
- }
-
-
- /**
- * Converts raw columns to the expected output
- */
- private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> {
-
- private final Id entityId;
-
-
- private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;}
-
-
- @Override
- public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) {
- final UniqueFieldEntry entry = column.getName();
-
- return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() );
- }
- }
-
@Override
- public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ public abstract Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies();
- final MultiTenantColumnFamilyDefinition uniqueLookupCF =
- new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ @Override
+ public abstract Collection<TableDefinition> getTables();
- final MultiTenantColumnFamilyDefinition uniqueLogCF =
- new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ /**
+ * Get the CQL table definition for the unique values log table
+ */
+ protected abstract TableDefinition getUniqueValuesTable();
- return Arrays.asList( uniqueLookupCF, uniqueLogCF );
+
+ protected abstract List<Object> deserializePartitionKey(ByteBuffer bb);
+
+ protected abstract ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry);
+
+ protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue );
+
+ protected abstract ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId);
+
+ protected abstract ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion);
+
+ protected abstract List<Object> deserializeUniqueValueColumn(ByteBuffer bb);
+
+ protected abstract List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb);
+
+
+
+ /**
+ * Get the CQL table definition for the unique values log table
+ */
+ protected abstract TableDefinition getEntityUniqueLogTable();
+
+
+ public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> {
+
+ private final Session session;
+ private final Statement query;
+ private final Id entityId;
+
+ private Iterator<Row> sourceIterator;
+
+
+
+ public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){
+
+ this.session = session;
+ this.query = query;
+ this.entityId = entityId;
+
+ }
+
+
+ @Override
+ public Iterator<UniqueValue> iterator() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if ( sourceIterator == null ) {
+
+ advanceIterator();
+
+ return sourceIterator.hasNext();
+ }
+
+ return sourceIterator.hasNext();
+ }
+
+ @Override
+ public UniqueValue next() {
+
+ com.datastax.driver.core.Row next = sourceIterator.next();
+
+ ByteBuffer column = next.getBytesUnsafe("column1");
+
+ List<Object> columnContents = deserializeUniqueValueLogColumn(column);
+
+ UUID version = (UUID) columnContents.get(0);
+ String name = (String) columnContents.get(1);
+ String value = (String) columnContents.get(2);
+ FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3));
+
+
+ return new UniqueValueImpl(getField(name, value, fieldType), entityId, version);
+
+ }
+
+ private void advanceIterator() {
+
+ sourceIterator = session.execute(query).iterator();
+ }
}
+ private Field getField( String name, String value, FieldTypeName fieldType){
- /**
- * Get the column family for the unique fields
- */
- protected abstract MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> getUniqueValuesCF();
+ Field field = null;
+ switch ( fieldType ) {
+ case BOOLEAN:
+ field = new BooleanField( name, Boolean.parseBoolean( value ) );
+ break;
+ case DOUBLE:
+ field = new DoubleField( name, Double.parseDouble( value ) );
+ break;
+ case FLOAT:
+ field = new FloatField( name, Float.parseFloat( value ) );
+ break;
+ case INTEGER:
+ field = new IntegerField( name, Integer.parseInt( value ) );
+ break;
+ case LONG:
+ field = new LongField( name, Long.parseLong( value ) );
+ break;
+ case STRING:
+ field = new StringField( name, value );
+ break;
+ case UUID:
+ field = new UUIDField( name, UUID.fromString( value ) );
+ break;
+ }
- /**
- * Generate a key that is compatible with the column family
- *
- * @param applicationId The applicationId
- * @param type The type in the field
- * @param field The field we're creating the key for
- */
- protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field );
+ return field;
- /**
- * Parse the row key into the field
- * @param rowKey
- * @return
- */
- protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
-
-
- /**
- * Get the column family for the unique field CF
- */
- protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
-
- /**
- * Generate a key that is compatible with the column family
- *
- * @param applicationId The applicationId
- * @param uniqueValueId The uniqueValue
- */
- protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId );
-
+ }
private class UniqueValueComparator implements Comparator<UniqueValue> {
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index f971b23..61f0f80 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@ -24,11 +24,14 @@
import java.util.Collections;
import java.util.Iterator;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
@@ -38,71 +41,46 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.ConsistencyLevel;
@Singleton
public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSerializationStrategy {
- protected final Keyspace keyspace;
private final VersionedMigrationSet<UniqueValueSerializationStrategy> versions;
private final MigrationInfoCache migrationInfoCache;
@Inject
- public UniqueValueSerializationStrategyProxyImpl( final Keyspace keyspace,
- final VersionedMigrationSet<UniqueValueSerializationStrategy>
+ public UniqueValueSerializationStrategyProxyImpl( final VersionedMigrationSet<UniqueValueSerializationStrategy>
allVersions,
final MigrationInfoCache migrationInfoCache ) {
- this.keyspace = keyspace;
this.migrationInfoCache = migrationInfoCache;
this.versions = allVersions;
}
@Override
- public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
+ public BatchStatement writeCQL(final ApplicationScope applicationScope, final UniqueValue uniqueValue,
+ final int timeToLive ){
+
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
if ( migration.needsMigration() ) {
- final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+ migration.from.writeCQL( applicationScope, uniqueValue, timeToLive );
+ migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
- aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue ) );
- aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue ) );
-
- return aggregateBatch;
}
- return migration.to.write( applicationScope, uniqueValue );
+ return migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
}
- @Override
- public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue,
- final int timeToLive ) {
- final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
-
- if ( migration.needsMigration() ) {
- final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
-
- aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue, timeToLive ) );
- aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue, timeToLive ) );
-
- return aggregateBatch;
- }
-
- return migration.to.write( applicationScope, uniqueValue, timeToLive );
- }
-
@Override
public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
- final Collection<Field> fields ) throws ConnectionException {
+ final Collection<Field> fields ) {
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
@@ -115,7 +93,7 @@
@Override
public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
- final Collection<Field> fields, boolean useReadRepair ) throws ConnectionException {
+ final Collection<Field> fields, boolean useReadRepair ) {
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
@@ -128,8 +106,9 @@
@Override
- public UniqueValueSet load(final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
+ public UniqueValueSet load( final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
+ final String type, final Collection<Field> fields, boolean useReadRepair ) {
+
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
@@ -154,19 +133,19 @@
@Override
- public MutationBatch delete( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
+ public BatchStatement deleteCQL( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
if ( migration.needsMigration() ) {
- final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+ final BatchStatement batch = new BatchStatement();
- aggregateBatch.mergeShallow( migration.from.delete( applicationScope, uniqueValue ) );
- aggregateBatch.mergeShallow( migration.to.delete( applicationScope, uniqueValue ) );
+ batch.add(migration.from.deleteCQL( applicationScope, uniqueValue ) );
+ batch.add(migration.to.deleteCQL( applicationScope, uniqueValue ) );
- return aggregateBatch;
+ return batch;
}
- return migration.to.delete( applicationScope, uniqueValue );
+ return migration.to.deleteCQL( applicationScope, uniqueValue );
}
@@ -184,6 +163,11 @@
return Collections.emptyList();
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+ return Collections.emptyList();
+ }
+
@Override
public int getImplementationVersion() {
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index 6a1cb58..55ba011 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@ -20,25 +20,25 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.util.*;
-import org.apache.cassandra.db.marshal.BytesType;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
/**
@@ -48,106 +48,371 @@
public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> {
- private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
- new CollectionScopedRowKeySerializer<>( UniqueFieldRowKeySerializer.get() );
- private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
-
- private static final MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>
- CF_UNIQUE_VALUES = new MultiTenantColumnFamily<>( "Unique_Values", ROW_KEY_SER, ENTITY_VERSION_SER );
+ private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values");
+ private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.CUSTOM );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" );}};
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+ private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values");
+ private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.CUSTOM );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
- private static final CollectionScopedRowKeySerializer<Id> ENTITY_ROW_KEY_SER =
- new CollectionScopedRowKeySerializer<>( ID_SER );
+ private final static TableDefinition uniqueValues =
+ new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+ UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
+ private final static TableDefinition uniqueValuesLog =
+ new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+ UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
- private static final MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
- CF_ENTITY_UNIQUE_VALUE_LOG =
- new MultiTenantColumnFamily<>( "Entity_Unique_Values", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
/**
* Construct serialization strategy for keyspace.
*
- * @param keyspace Keyspace in which to store Unique Values.
* @param cassandraFig The cassandra configuration
* @param serializationFig The serialization configuration
*/
@Inject
- public UniqueValueSerializationStrategyV1Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
- super( keyspace, cassandraFig, serializationFig );
+ public UniqueValueSerializationStrategyV1Impl( final CassandraFig cassandraFig,
+ final SerializationFig serializationFig,
+ final Session session,
+ final CassandraConfig cassandraConfig) {
+ super( cassandraFig, serializationFig, session, cassandraConfig );
}
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTenantColumnFamilyDefinition uniqueLookupCF =
- new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ return Collections.emptyList();
- final MultiTenantColumnFamilyDefinition uniqueLogCF =
- new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ }
- return Arrays.asList( uniqueLookupCF, uniqueLogCF );
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ final TableDefinition uniqueValues = getUniqueValuesTable();
+ final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+
+ return Arrays.asList( uniqueValues, uniqueValuesLog );
+
+ }
+
+
+
+ @Override
+ protected TableDefinition getUniqueValuesTable(){
+
+ return uniqueValues;
}
@Override
- protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion> getUniqueValuesCF() {
- return CF_UNIQUE_VALUES;
+ protected TableDefinition getEntityUniqueLogTable(){
+
+ return uniqueValuesLog;
+
}
@Override
- protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
- getEntityUniqueLogCF() {
- return CF_ENTITY_UNIQUE_VALUE_LOG;
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(8);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, appUUID);
+ keys.add(3, applicationType);
+ keys.add(4, entityType);
+ keys.add(5, fieldType);
+ keys.add(6, fieldName);
+ keys.add(7, fieldValueString);
+
+ */
+
+ int count = 0;
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(count == 0 || count == 2){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+ count++;
+ }
+
+ return stuff;
+
}
-
@Override
- protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
- final String type, final Field field) {
+ protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
- final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type );
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
- final CollectionPrefixedKey<Field> uniquePrefixedKey =
- new CollectionPrefixedKey<>( collectionName, applicationId, field );
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
- return uniquePrefixedKey;
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
+
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+
+ stuff.putShort((short) ('耀' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
}
-
@Override
- protected Field parseRowKey( final ScopedRowKey<CollectionPrefixedKey<Field>> rowKey ) {
- return rowKey.getKey().getSubKey();
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
}
+ @Override
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
@Override
- protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId,
- final Id uniqueValueId ) {
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
- final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( uniqueValueId.getType() );
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ // pull of custom comparator (per Astyanax deserialize)
+ int e = CQLUtils.getShortLength(bb);
+ if((e & '耀') == 0) {
+ CQLUtils.getBytes(bb, e);
+ } else {
+ // do nothing
+ }
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
- final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
- new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+ /**
+ * List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+ */
- return collectionPrefixedEntityKey;
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ // pull of custom comparator (per Astyanax deserialize)
+ int e = CQLUtils.getShortLength(bb);
+ if((e & '耀') == 0) {
+ CQLUtils.getBytes(bb, e);
+ } else {
+ // do nothing
+ }
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first composite is a UUID, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
}
@@ -155,4 +420,113 @@
public int getImplementationVersion() {
return CollectionDataVersions.INITIAL.getVersion();
}
+
+
+
+ private ByteBuffer serializeKey( UUID appUUID,
+ String applicationType,
+ String entityType,
+ String fieldType,
+ String fieldName,
+ Object fieldValue ){
+
+ final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+
+ /**
+ final CollectionPrefixedKey<Field> uniquePrefixedKey =
+ new CollectionPrefixedKey<>( collectionName, applicationId, field );
+
+ final Id orgId = ID_SER.fromComposite( parser );
+ final Id scopeId = ID_SER.fromComposite( parser );
+ final String scopeName = parser.readString();
+ final K value = keySerializer.fromComposite( parser );
+ **/
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldValue.toString().toLowerCase();
+
+ List<Object> keys = new ArrayList<>(8);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, appUUID);
+ keys.add(3, applicationType);
+ keys.add(4, collectionName);
+ keys.add(5, fieldType);
+ keys.add(6, fieldName);
+ keys.add(7, fieldValueString);
+
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16 + applicationType.length() + 16 + applicationType.length() + collectionName.length() +
+ fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+
+ final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+
+ /**
+ final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
+ new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+ **/
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(collectionName);
+ keys.add(entityId);
+ keys.add(entityType);
+
+ int size = 16+applicationType.length()+16+applicationType.length()+collectionName.length()+16+entityType.length();
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 40622a4..a5fceeb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@ -20,25 +20,24 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.util.*;
-import org.apache.cassandra.db.marshal.BytesType;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
/**
@@ -47,87 +46,365 @@
@Singleton
public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializationStrategyImpl<TypeField, Id> {
-
- private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER = new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() );
+ private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values_V2");
+ private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.CUSTOM );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" );}};
- private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
+ private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values_V2");
+ private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.CUSTOM );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" );}};
- private static final MultiTenantColumnFamily<ScopedRowKey<TypeField>, EntityVersion>
- CF_UNIQUE_VALUES = new MultiTenantColumnFamily<>( "Unique_Values_V2", ROW_KEY_SER, ENTITY_VERSION_SER );
+ private final static TableDefinition uniqueValues =
+ new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+ UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
- private static final ScopedRowKeySerializer<Id> ENTITY_ROW_KEY_SER =
- new ScopedRowKeySerializer<>( ID_SER );
-
-
- private static final MultiTenantColumnFamily<ScopedRowKey<Id>, UniqueFieldEntry>
- CF_ENTITY_UNIQUE_VALUE_LOG =
- new MultiTenantColumnFamily<>( "Entity_Unique_Values_V2", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
+ private final static TableDefinition uniqueValuesLog =
+ new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+ UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
/**
* Construct serialization strategy for keyspace.
*
- * @param keyspace Keyspace in which to store Unique Values.
* @param cassandraFig The cassandra configuration
* @param serializationFig The serialization configuration
+ *
*/
@Inject
- public UniqueValueSerializationStrategyV2Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
- super( keyspace, cassandraFig, serializationFig );
+ public UniqueValueSerializationStrategyV2Impl( final CassandraFig cassandraFig,
+ final SerializationFig serializationFig,
+ final Session session,
+ final CassandraConfig cassandraConfig) {
+ super( cassandraFig, serializationFig, session, cassandraConfig );
}
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTenantColumnFamilyDefinition uniqueLookupCF =
- new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ return Collections.emptyList();
- final MultiTenantColumnFamilyDefinition uniqueLogCF =
- new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ }
- return Arrays.asList( uniqueLookupCF, uniqueLogCF );
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ final TableDefinition uniqueValues = getUniqueValuesTable();
+ final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+
+ return Arrays.asList( uniqueValues, uniqueValuesLog );
+
}
@Override
- protected MultiTenantColumnFamily<ScopedRowKey<TypeField>, EntityVersion> getUniqueValuesCF() {
- return CF_UNIQUE_VALUES;
+ protected TableDefinition getUniqueValuesTable(){
+ return uniqueValues;
}
@Override
- protected MultiTenantColumnFamily<ScopedRowKey<Id>, UniqueFieldEntry>
- getEntityUniqueLogCF() {
- return CF_ENTITY_UNIQUE_VALUE_LOG;
+ protected TableDefinition getEntityUniqueLogTable(){
+ return uniqueValuesLog;
}
@Override
- protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) {
- return new TypeField(type,field);
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(6);
+ keys.add(0, appUUID); // UUID
+ keys.add(1, applicationType); // String
+ keys.add(2, entityType); // String
+ keys.add(3, fieldType); // String
+ keys.add(4, fieldName); // String
+ keys.add(5, fieldValueString); // String
+
+ */
+
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(stuff.size() == 0){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ }
+
+ return stuff;
+
}
-
@Override
- protected Field parseRowKey( final ScopedRowKey<TypeField> rowKey ) {
- return rowKey.getKey().getField();
+ protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
+
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+
+ stuff.putShort((short) ('耀' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
}
+ @Override
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
+ }
@Override
- protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) {
- return uniqueValueId;
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
+
+ @Override
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ // custom comparator alias to comparator mappings in CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax )
+ // the custom mapping is used for schema creation, but datastax driver does not have the alias concept and
+ // we must work with the actual types
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ // pull of custom comparator (per Astyanax deserialize)
+ int e = CQLUtils.getShortLength(bb);
+ if((e & '耀') == 0) {
+ CQLUtils.getBytes(bb, e);
+ } else {
+ // do nothing
+ }
+
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+ */
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ int e = CQLUtils.getShortLength(bb);
+ if((e & '耀') == 0) {
+ CQLUtils.getBytes(bb, e);
+ } else {
+ // do nothing
+ }
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first composite is a UUID, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
}
@@ -135,4 +412,87 @@
public int getImplementationVersion() {
return CollectionDataVersions.LOG_REMOVAL.getVersion();
}
+
+
+
+ // row key = app UUID + app type + app UUID + app type + field type + field name + field value
+ private ByteBuffer serializeKey(UUID appUUID,
+ String applicationType,
+ String entityType,
+ String fieldType,
+ String fieldName,
+ Object fieldValue ){
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldValue.toString().toLowerCase();
+
+ List<Object> keys = new ArrayList<>(6);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, entityType);
+ keys.add(3, fieldType);
+ keys.add(4, fieldName);
+ keys.add(5, fieldValueString);
+
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16 + applicationType.length() + entityType.length() + fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(entityId);
+ keys.add(entityType);
+
+ int size = 16+applicationType.length()+16+entityType.length();
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
index 8dd9528..853913b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java
@@ -37,7 +37,11 @@
public void addValue(UniqueValue value){
- values.put( value.getField().getName(), value );
+ values.putIfAbsent( value.getField().getName(), value );
+ // ^^ putIfAbsent important here as CQL returns column values differently than Asytanax/thrift due to CQL not
+ // having a 'column range' for each row slice and all columns are returned. We don't want to overwrite the
+ // first column values retrieved
+
}
@Override
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index a110ed7..8d52d8b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -26,6 +26,8 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.cql.BatchStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,7 @@
private static final Logger logger = LoggerFactory.getLogger( MvccEntityDataMigrationImpl.class );
private final Keyspace keyspace;
+ private final Session session;
private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
@@ -80,12 +83,14 @@
@Inject
public MvccEntityDataMigrationImpl( final Keyspace keyspace,
+ final Session session,
final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final MigrationDataProvider<EntityIdScope> migrationDataProvider ) {
this.keyspace = keyspace;
+ this.session = session;
this.allVersions = allVersions;
this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
@@ -163,8 +168,9 @@
final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+ final com.datastax.driver.core.BatchStatement uniqueBatch = new com.datastax.driver.core.BatchStatement();
- for ( EntityToSaveMessage message : entities ) {
+ for ( EntityToSaveMessage message : entities ) {
try {
final MutationBatch entityRewrite = migration.to.write(message.scope, message.entity);
@@ -197,17 +203,14 @@
// time with
// no TTL so that cleanup can clean up
// older values
+
+
for (final Field field : EntityUtils.getUniqueFields(message.entity.getEntity().get())) {
final UniqueValue written = new UniqueValueImpl(field, entityId, version);
- final MutationBatch mb = uniqueValueSerializationStrategy.write(message.scope, written);
+ uniqueBatch.add(uniqueValueSerializationStrategy.writeCQL(message.scope, written, -1));
-
- // merge into our
- // existing mutation
- // batch
- totalBatch.mergeShallow(mb);
}
@@ -232,7 +235,7 @@
}
- executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+ executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong, uniqueBatch );
//now run our cleanup task
@@ -252,10 +255,13 @@
}
- protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po,
- final AtomicLong count ) {
+ protected void executeBatch(final int targetVersion, final MutationBatch batch, final ProgressObserver po,
+ final AtomicLong count, com.datastax.driver.core.BatchStatement uniqueBatch) {
try {
+
batch.execute();
+ session.execute(uniqueBatch);
+
po.update( targetVersion, "Finished copying " + count + " entities to the new format" );
}
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index 2cad32c..ed88ba6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@ -18,6 +18,9 @@
*/
package org.apache.usergrid.persistence.collection.uniquevalues;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Batch;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.MutationBatch;
@@ -43,13 +46,17 @@
public class UniqueValuesTableImpl implements UniqueValuesTable {
private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
- final UniqueValueSerializationStrategy strat;
- final UniqueValuesFig uniqueValuesFig;
+ private final UniqueValueSerializationStrategy strat;
+ private final UniqueValuesFig uniqueValuesFig;
+ private final Session session;
@Inject
- public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) {
+ public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat,
+ final UniqueValuesFig uniqueValuesFig,
+ final Session session ) {
this.strat = strat;
this.uniqueValuesFig = uniqueValuesFig;
+ this.session = session;
}
@@ -65,16 +72,16 @@
public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
UniqueValue uv = new UniqueValueImpl( field, owner, version);
- final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
- write.execute();
+ final BatchStatement statement = strat.writeCQL( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
+ session.execute(statement);
}
@Override
public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
UniqueValue uv = new UniqueValueImpl( field, owner, version);
- final MutationBatch write = strat.write( scope, uv );
- write.execute();
+ final BatchStatement statement = strat.writeCQL( scope, uv, -1 );
+ session.execute(statement);
}
@@ -82,8 +89,8 @@
public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
UniqueValue uv = new UniqueValueImpl( field, owner, version );
- final MutationBatch write = strat.delete( scope, uv );
- write.execute();
+ final BatchStatement statement = strat.deleteCQL( scope, uv );
+ session.execute(statement);
}
@Override
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index 89169ac..f98a3ea 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -1,7 +1,13 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -32,6 +38,9 @@
/** @author tnine */
public class MarkCommitTest extends AbstractMvccEntityStageTest {
+ @Inject
+
+
/** Standard flow */
@Test
public void testStartStage() throws Exception {
@@ -39,6 +48,8 @@
final ApplicationScope context = mock( ApplicationScope.class );
+ final Session session = mock(Session.class);
+
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -71,7 +82,9 @@
//run the stage
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null);
+ WriteCommit newStage
+ = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session);
+
//verify the observable is correct
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index dcc473c..df0fc9e 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -18,7 +18,13 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -53,6 +59,8 @@
final ApplicationScope context = mock( ApplicationScope.class );
+ final Session session = mock(Session.class);
+
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -85,7 +93,8 @@
//run the stage
WriteCommit newStage =
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null );
+ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session );
+
Entity result = newStage.call(
@@ -118,6 +127,9 @@
/**
* Write up mock mutations so we don't npe on the our operations, but rather on the input
*/
+
+ final Session session = mock(Session.class);
+
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
final MutationBatch logMutation = mock( MutationBatch.class );
@@ -133,7 +145,8 @@
when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
.thenReturn( entityMutation );
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event );
+ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session ).call( event );
+
}
}
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
index 6a705e4..148cc09 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java
@@ -21,7 +21,12 @@
import java.util.ArrayList;
import java.util.List;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.test.ITRunner;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +58,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
-
@UseModules( TestCollectionModule.class )
public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest {
@@ -110,6 +113,7 @@
when( scope.getApplication() )
.thenReturn( new SimpleId( UUIDGenerator.newTimeUUID(), "organization" ) );
+ final Session session = mock(Session.class);
// there is an entity
final Entity entity = generateEntity();
@@ -135,16 +139,13 @@
UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class);
UniqueValue uv1 = new UniqueValueImpl(entity.getField("name"), entity.getId(), entity.getVersion());
UniqueValue uv2 = new UniqueValueImpl( entity.getField("identifier"), entity.getId(), entity.getVersion());
- MutationBatch mb = mock( MutationBatch.class );
- when( uvstrat.delete(scope, uv1) ).thenReturn(mb);
- when( uvstrat.delete(scope, uv2) ).thenReturn(mb);
// Run the stage, conflict should be detected
final MvccEntity mvccEntity = fromEntity( entity );
boolean conflictDetected = false;
WriteOptimisticVerify newStage = new WriteOptimisticVerify( mvccLog );
- RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat );
+ RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat, session );
try {
newStage.call( new CollectionIoEvent<>(scope, mvccEntity));
@@ -157,8 +158,8 @@
assertTrue( conflictDetected );
// check that unique values were deleted
- verify( uvstrat, times(1) ).delete(scope, uv1 );
- verify( uvstrat, times(1) ).delete(scope, uv2 );
+ verify( uvstrat, times(1) ).deleteCQL(scope, uv1 );
+ verify( uvstrat, times(1) ).deleteCQL(scope, uv2 );
}
}
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 401d23e..87226be 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@ -18,6 +18,7 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+import com.datastax.driver.core.Session;
import com.google.inject.Inject;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.collection.*;
@@ -81,6 +82,9 @@
@Inject
UniqueValuesService uniqueValuesService;
+ @Inject
+ Session session;
+
@Before
public void initAkka() {
@@ -198,8 +202,8 @@
entityDuplicate.getId(), UUIDGenerator.newTimeUUID());
// manually insert a record to simulate a 'duplicate' trying to be inserted
- uniqueValueSerializationStrategy.
- write(scope, uniqueValue).execute();
+ session.execute(uniqueValueSerializationStrategy.
+ writeCQL(scope, uniqueValue, -1));
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 7afba05..1290a5c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,6 +18,9 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+
+import com.datastax.driver.core.Session;
+
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
@@ -25,13 +28,17 @@
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.collection.AbstractUniqueValueTest;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+
+import org.apache.usergrid.persistence.core.CassandraConfig;
+
import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.test.ITRunner;
@@ -62,6 +69,9 @@
public MigrationManagerRule migrationManagerRule;
@Inject
+ private Session session;
+
+ @Inject
private SerializationFig fig;
@Inject
@@ -96,7 +106,8 @@
final MvccEntity mvccEntity = fromEntity( entity );
// run the stage
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null );
+ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null, session );
+
newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
index f6720f9..ef3aabd 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
@@ -33,7 +33,7 @@
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
index f8b65d9..2dca27d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java
@@ -36,7 +36,7 @@
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.util.EntityHelper;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.guicyfig.SetConfigTestBypass;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidationTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidationTest.java
index 273bec0..cf700f3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidationTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/SettingsValidationTest.java
@@ -23,7 +23,7 @@
import org.junit.Test;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index 3dbf1ec..c4c083f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -23,7 +23,10 @@
import java.util.Iterator;
import java.util.UUID;
-import com.netflix.astyanax.model.ConsistencyLevel;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -47,7 +50,6 @@
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static org.junit.Assert.assertEquals;
@@ -64,6 +66,8 @@
@Rule
public MigrationManagerRule migrationManagerRule;
+ @Inject
+ private Session session;
private UniqueValueSerializationStrategy strategy;
@@ -91,7 +95,9 @@
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL(scope, stored, -1);
+ session.execute(batch);
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
@@ -127,7 +133,9 @@
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored, 5 ).execute();
+ //strategy.write( scope, stored, 5 ).execute();
+ BatchStatement batch = strategy.writeCQL(scope, stored, 5);
+ session.execute(batch);
Thread.sleep( 1000 );
@@ -179,9 +187,15 @@
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
- strategy.delete( scope, stored ).execute();
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+ session.execute(batch);
+
+
+ //strategy.delete( scope, stored ).execute();
+ BatchStatement deleteBatch = strategy.deleteCQL(scope, stored);
+ session.execute(deleteBatch);
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
@@ -207,8 +221,9 @@
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
-
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+ session.execute(batch);
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
@@ -278,9 +293,13 @@
UniqueValue version1Field1Value = new UniqueValueImpl( version1Field1, entityId, version1 );
UniqueValue version1Field2Value = new UniqueValueImpl( version1Field2, entityId, version1 );
- final MutationBatch batch = strategy.write( scope, version1Field1Value );
- batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+ //final MutationBatch batch = strategy.write( scope, version1Field1Value );
+ //batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+ final BatchStatement batch = new BatchStatement();
+
+ batch.add(strategy.writeCQL( scope, version1Field1Value, -1));
+ batch.add(strategy.writeCQL( scope, version1Field2Value, -1));
//write V2 of everything
final UUID version2 = UUIDGenerator.newTimeUUID();
@@ -292,10 +311,15 @@
UniqueValue version2Field1Value = new UniqueValueImpl( version2Field1, entityId, version2 );
UniqueValue version2Field2Value = new UniqueValueImpl( version2Field2, entityId, version2 );
- batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
- batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+ //batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
+ //batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
- batch.execute();
+ batch.add(strategy.writeCQL( scope, version2Field1Value, -1));
+ batch.add(strategy.writeCQL( scope, version2Field2Value, -1));
+
+ session.execute(batch);
+
+ //batch.execute();
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Arrays.<Field>asList( version1Field1, version1Field2 ) );
@@ -366,11 +390,11 @@
UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version1 );
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
+ session.execute(strategy.writeCQL( scope, stored1, -1 ));
+ session.execute(strategy.writeCQL( scope, stored2, -1 ));
// load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
UniqueValue retrieved = fields.getValue( field.getName() );
@@ -382,10 +406,10 @@
UUID version3 = UUIDGenerator.newTimeUUID();
UniqueValue stored3 = new UniqueValueImpl( field, entityId2, version3);
- strategy.write( scope, stored3 ).execute();
+ session.execute(strategy.writeCQL( scope, stored3, -1 ));
// load the values again, we should still only get back the original unique value
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
retrieved = fields.getValue( field.getName() );
@@ -396,10 +420,10 @@
UUID version4 = UUIDGenerator.newTimeUUID();
UniqueValue stored4 = new UniqueValueImpl( field, entityId1, version4);
- strategy.write( scope, stored4 ).execute();
+ session.execute(strategy.writeCQL( scope, stored4, -1 ));
// load the values again, now we should get the latest version of the original UUID written
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
retrieved = fields.getValue( field.getName() );
@@ -433,11 +457,11 @@
UniqueValue stored2 = new UniqueValueImpl( field, entityId1, version2 );
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
+ session.execute(strategy.writeCQL( scope, stored1, -1 ));
+ session.execute(strategy.writeCQL( scope, stored2, -1 ));
// load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
UniqueValue retrieved = fields.getValue( field.getName() );
@@ -469,19 +493,16 @@
UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
+ session.execute(strategy.writeCQL( scope, stored1, -1 ));
+ session.execute(strategy.writeCQL( scope, stored2, -1 ));
+ session.execute(strategy.writeCQL( scope, stored3, -1 ));
// load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
- fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
- entityId1.getType(), Collections.<Field>singleton( field ), false);
-
UniqueValue retrieved = fields.getValue( field.getName() );
assertEquals( stored3, retrieved );
@@ -510,14 +531,14 @@
UniqueValue stored3 = new UniqueValueImpl( field, entityId3, version3 );
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
+ session.execute(strategy.writeCQL( scope, stored1, -1 ));
+ session.execute(strategy.writeCQL( scope, stored2, -1 ));
+ session.execute(strategy.writeCQL( scope, stored3, -1 ));
// load descending to get the older version of entity for this unique value
UniqueValueSet fields = strategy.load( scope,
- ConsistencyLevel.CL_LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
+ ConsistencyLevel.LOCAL_QUORUM, entityId1.getType(), Collections.<Field>singleton( field ), true);
UniqueValue retrieved = fields.getValue( field.getName() );
assertEquals( stored1, retrieved );
@@ -552,15 +573,15 @@
- strategy.write( scope, stored1 ).execute();
- strategy.write( scope, stored2 ).execute();
- strategy.write( scope, stored3 ).execute();
- strategy.write( scope, stored4 ).execute();
- strategy.write( scope, stored5 ).execute();
+ session.execute(strategy.writeCQL( scope, stored1, -1 ));
+ session.execute(strategy.writeCQL( scope, stored2, -1 ));
+ session.execute(strategy.writeCQL( scope, stored3, -1 ));
+ session.execute(strategy.writeCQL( scope, stored4, -1 ));
+ session.execute(strategy.writeCQL( scope, stored5, -1 ));
// load descending to get the older version of entity for this unique value
- UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.CL_LOCAL_QUORUM,
+ UniqueValueSet fields = strategy.load( scope, ConsistencyLevel.LOCAL_QUORUM,
entityId1.getType(), Collections.<Field>singleton( field ), true);
UniqueValue retrieved = fields.getValue( field.getName() );
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
index 9be979b..046aaf5 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
@@ -22,6 +22,7 @@
import java.util.UUID;
+import com.datastax.driver.core.Session;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -87,6 +88,8 @@
@Inject
public Keyspace keyspace;
@Inject
+ public Session session;
+ @Inject
public VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
@Inject
public MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
@@ -148,7 +151,7 @@
assertEquals( "Same instance for to", v3Impl.getClass(), tuple.to.getClass() );
- MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl = new MvccEntityDataMigrationImpl(keyspace, allVersions, mvccEntitySerializationStrategyV3, uniqueValueSerializationStrategy, mvccLogEntrySerializationStrategy, migrationProvider);
+ MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl = new MvccEntityDataMigrationImpl(keyspace, session, allVersions, mvccEntitySerializationStrategyV3, uniqueValueSerializationStrategy, mvccLogEntrySerializationStrategy, migrationProvider);
//now migration
diff --git a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
index 6b6f551..ca203f9 100644
--- a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
@@ -1,12 +1,10 @@
# The properties are not the actual configuration properties but
# safe dynamic property defaults for our testing via IDE or Maven
-cassandra.connections=10
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
collection.stage.transient.timeout=5
# This property is required to be set and cannot be defaulted anywhere
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
index d404b1e..ebae735 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
@@ -18,16 +18,14 @@
# These are for CHOP environment settings
-cassandra.connections=20
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
# a comma delimited private IP address list to your chop cassandra cluster
# define this in your settings.xml and have it as an always active profile
cassandra.hosts=${chop.cassandra.hosts}
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
# This property is required to be set and cannot be defaulted anywhere
usergrid.cluster_name=usergrid
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
index 81a4c17..964bbbf 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
@@ -20,11 +20,9 @@
# Keep nothing but overriding test defaults in here
cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
cassandra.embedded=true
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index bbcadff..63d339b 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -73,6 +73,10 @@
<version>${cassandra.version}</version>
<exclusions>
<exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
@@ -197,6 +201,29 @@
<artifactId>metrics-graphite</artifactId>
<version>${metrics.version}</version>
</dependency>
+
+ <!-- Use the shaded jar dependency so we don't conflict on other Netty versions in the application -->
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>${datastax.version}</version>
+ <classifier>shaded</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- LZ4 compression used for Datastax Java Driver:
+ https://datastax.github.io/java-driver/2.1.7/features/compression/ -->
+ <dependency>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+
</dependencies>
<profiles>
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
similarity index 72%
rename from stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
rename to stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index dba3646..595b65f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.usergrid.persistence.core.astyanax;
+package org.apache.usergrid.persistence.core;
import com.netflix.astyanax.model.ConsistencyLevel;
@@ -37,7 +37,7 @@
ConsistencyLevel getReadCL();
/**
- * Get the currently configured ReadCL that is more consitent than getReadCL
+ * Get the currently configured ReadCL that is more consitent than getAstyanaxReadCL
* @return
*/
ConsistencyLevel getConsistentReadCL();
@@ -48,6 +48,25 @@
*/
ConsistencyLevel getWriteCL();
+
+ /**
+ * Get the currently configured read CL for DataStax driver
+ * @return
+ */
+ com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl();
+
+ /**
+ * Get the currently configured write CL for DataStax driver
+ * @return
+ */
+ com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl();
+
+ /**
+ * Get the currently configured consistent read CL for DataStax driver
+ * @return
+ */
+ com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl();
+
/**
* Return the number of shards that has been set in the property file
* @return
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
similarity index 70%
rename from stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
rename to stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 7373322..e87ebb8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.usergrid.persistence.core.astyanax;
+package org.apache.usergrid.persistence.core;
import java.beans.PropertyChangeEvent;
@@ -41,17 +41,29 @@
private int[] shardSettings;
private ConsistencyLevel consistentCl;
+ // DataStax driver's CL
+ private com.datastax.driver.core.ConsistencyLevel dataStaxReadCl;
+ private com.datastax.driver.core.ConsistencyLevel dataStaxWriteCl;
+ private com.datastax.driver.core.ConsistencyLevel dataStaxReadConsistentCl;
+
+
@Inject
public CassandraConfigImpl( final CassandraFig cassandraFig ) {
- this.readCl = ConsistencyLevel.valueOf( cassandraFig.getReadCL() );
+ this.readCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() );
- this.writeCl = ConsistencyLevel.valueOf( cassandraFig.getWriteCL() );
+ this.writeCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxWriteCL() );
this.shardSettings = parseShardSettings( cassandraFig.getShardValues() );
- this.consistentCl = ConsistencyLevel.valueOf(cassandraFig.getConsistentReadCL());
+ this.consistentCl = ConsistencyLevel.valueOf(cassandraFig.getAstyanaxConsistentReadCL());
+
+ this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl());
+
+ this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadClConsistent());
+
+ this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() );
//add the listeners to update the values
cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {
@@ -59,11 +71,11 @@
public void propertyChange( final PropertyChangeEvent evt ) {
final String propName = evt.getPropertyName();
- if ( CassandraFig.READ_CL.equals( propName ) ) {
+ if ( CassandraFig.ASTYANAX_READ_CL.equals( propName ) ) {
readCl = ConsistencyLevel.valueOf( evt.getNewValue().toString() );
}
- else if ( CassandraFig.WRITE_CL.equals( propName ) ) {
+ else if ( CassandraFig.ASTYANAX_WRITE_CL.equals( propName ) ) {
writeCl = ConsistencyLevel.valueOf( evt.getNewValue().toString() );
}
else if (CassandraFig.SHARD_VALUES.equals(propName)){
@@ -89,6 +101,21 @@
return writeCl;
}
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+ return dataStaxReadCl;
+ }
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+ return dataStaxWriteCl;
+ }
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+ return dataStaxReadConsistentCl;
+ }
+
@Override
public int[] getShardSettings() {
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
similarity index 79%
rename from stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
rename to stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index d315561..2996465 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.usergrid.persistence.core.astyanax;
+package org.apache.usergrid.persistence.core;
import org.safehaus.guicyfig.Default;
@@ -25,17 +25,23 @@
import org.safehaus.guicyfig.Key;
-
/**
* Cassandra configuration interface.
*/
@FigSingleton
public interface CassandraFig extends GuicyFig {
+ // cassndra properties used by datastax driver
+ String READ_CL = "cassandra.readcl";
+ String READ_CL_CONSISTENT = "cassandra.readcl.consistent";
+ String WRITE_CL = "cassandra.writecl";
+ String STRATEGY = "cassandra.strategy";
+ String STRATEGY_OPTIONS = "cassandra.strategy.options";
+
// main application cassandra properties
- String READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
- String READ_CL = "usergrid.read.cl";
- String WRITE_CL = "usergrid.write.cl";
+ String ASTYANAX_READ_CONSISTENT_CL = "usergrid.consistent.read.cl";
+ String ASTYANAX_READ_CL = "usergrid.read.cl";
+ String ASTYANAX_WRITE_CL = "usergrid.write.cl";
String SHARD_VALUES = "cassandra.shardvalues";
String THRIFT_TRANSPORT_SIZE = "cassandra.thrift.transport.frame";
String USERNAME = "cassandra.username";
@@ -56,11 +62,19 @@
// re-usable default values
String DEFAULT_CONNECTION_POOLSIZE = "15";
String DEFAULT_LOCKS_EXPIRATION = "3600000"; // 1 hour
+ String DEFAULT_LOCAL_DC = "";
+ String DEFAULT_USERNAME = "";
+ String DEFAULT_PASSWORD = "";
@Key( "cassandra.hosts" )
String getHosts();
+ /**
+ * Valid options are 1.2, 2.0, 2.1
+ *
+ * @return
+ */
@Key( "cassandra.version" )
@Default( "2.1" )
String getVersion();
@@ -77,15 +91,18 @@
@Default( "9160" )
int getThriftPort();
- @Key( "cassandra.datacenter.local" )
- String getLocalDataCenter();
-
@Key( USERNAME )
+ @Default( DEFAULT_USERNAME )
String getUsername();
@Key( PASSWORD )
+ @Default( DEFAULT_PASSWORD )
String getPassword();
+ @Key( "cassandra.datacenter.local" )
+ @Default( DEFAULT_LOCAL_DC )
+ String getLocalDataCenter();
+
@Key( "cassandra.connections" )
@Default( DEFAULT_CONNECTION_POOLSIZE )
int getConnections();
@@ -94,22 +111,47 @@
@Default( "10000" )
int getTimeout();
+ @Key( "cassandra.timeout.pool" )
+ @Default( "5000" )
+ int getPoolTimeout();
+
@Key("cassandra.discovery")
@Default( "RING_DESCRIBE" )
String getDiscoveryType();
@Default("CL_LOCAL_QUORUM")
- @Key(READ_CL)
- String getReadCL();
+ @Key(ASTYANAX_READ_CL)
+ String getAstyanaxReadCL();
@Default("CL_QUORUM")
- @Key(READ_CONSISTENT_CL)
- String getConsistentReadCL();
+ @Key(ASTYANAX_READ_CONSISTENT_CL)
+ String getAstyanaxConsistentReadCL();
@Default("CL_LOCAL_QUORUM")
+ @Key(ASTYANAX_WRITE_CL)
+ String getAstyanaxWriteCL();
+
+
+ @Default("LOCAL_QUORUM")
+ @Key(READ_CL)
+ String getReadCl();
+
+ @Default("QUORUM")
+ @Key(READ_CL_CONSISTENT)
+ String getReadClConsistent();
+
+ @Default("LOCAL_QUORUM")
@Key(WRITE_CL)
- String getWriteCL();
+ String getWriteCl();
+
+ @Default("SimpleStrategy")
+ @Key( STRATEGY )
+ String getStrategy();
+
+ @Default("replication_factor:1")
+ @Key( STRATEGY_OPTIONS )
+ String getStrategyOptions();
/**
* Return the history of all shard values which are immutable. For instance, if shard values
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraClusterImpl.java
index cd42c5b..9f5d4ac 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraClusterImpl.java
@@ -36,6 +36,8 @@
import com.netflix.astyanax.connectionpool.impl.Slf4jConnectionPoolMonitorImpl;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
new file mode 100644
index 0000000..0d6a312
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+import com.datastax.driver.core.DataType;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.util.StringUtils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class CQLUtils {
+
+ private final CassandraFig cassandraFig;
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+ public enum ACTION {
+ CREATE, UPDATE
+ }
+
+ static String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS";
+ static String ALTER_TABLE = "ALTER TABLE";
+ static String WITH ="WITH";
+ static String AND = "AND";
+ static String EQUAL = "=";
+ static String COMPRESSION = "compression";
+ static String COMPACTION = "compaction";
+ static String CACHING = "caching";
+ static String GC_GRACE_SECONDS = "gc_grace_seconds";
+ static String PRIMARY_KEY = "PRIMARY KEY";
+ static String COMPACT_STORAGE = "COMPACT STORAGE";
+ static String CLUSTERING_ORDER_BY = "CLUSTERING ORDER BY";
+ static String COMMA = ",";
+ static String PAREN_LEFT = "(";
+ static String PAREN_RIGHT = ")";
+
+ static String COMPOSITE_TYPE = "'org.apache.cassandra.db.marshal.DynamicCompositeType(a=>org.apache.cassandra.db.marshal.AsciiType,A=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.AsciiType),b=>org.apache.cassandra.db.marshal.BytesType,B=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.BytesType),i=>org.apache.cassandra.db.marshal.IntegerType,I=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.IntegerType),l=>org.apache.cassandra.db.marshal.LongType,L=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LongType),s=>org.apache.cassandra.db.marshal.UTF8Type,S=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type),t=>org.apache.cassandra.db.marshal.TimeUUIDType,T=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.TimeUUIDType),u=>org.apache.cassandra.db.marshal.UUIDType,U=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UUIDType),x=>org.apache.cassandra.db.marshal.LexicalUUIDType,X=>org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.LexicalUUIDType))'";
+
+ @Inject
+ public CQLUtils ( final CassandraFig cassandraFig ){
+
+ this.cassandraFig = cassandraFig;
+
+ }
+
+
+ public static String getFormattedReplication(String strategy, String strategyOptions) throws JsonProcessingException {
+
+ Map<String, String> replicationSettings = new HashMap<>();
+ replicationSettings.put("class", strategy);
+ String[] strategyOptionsSplit = strategyOptions.split(",");
+ for ( String option : strategyOptionsSplit){
+ String[] splitOptions = option.split(":");
+ replicationSettings.put(splitOptions[0], splitOptions[1]);
+ }
+ return mapper.writeValueAsString(replicationSettings).replace("\"", "'");
+ }
+
+
+ public static String getMapAsCQLString(Map<String, Object> map) throws JsonProcessingException {
+
+ return mapper.writeValueAsString(map).replace("\"", "'");
+ }
+
+
+ public static String getTableCQL( CassandraFig cassandraFig, TableDefinition tableDefinition,
+ ACTION tableAction) throws Exception {
+
+ StringJoiner cql = new StringJoiner(" ");
+
+ if ( tableAction.equals(ACTION.CREATE) ){
+ cql.add(CREATE_TABLE);
+ } else if ( tableAction.equals(ACTION.UPDATE) ){
+ cql.add(ALTER_TABLE);
+ }else{
+ throw new Exception("Invalid Action specified. Must of of type CQLUtils.Action");
+ }
+
+ cql.add( tableDefinition.getTableName() );
+
+
+
+ if ( tableAction.equals(ACTION.CREATE) ){
+
+ cql.add(PAREN_LEFT).add( spaceSeparatedKeyValue(tableDefinition.getColumns()) ).add(COMMA)
+ .add(PRIMARY_KEY)
+ .add(PAREN_LEFT).add(PAREN_LEFT)
+ .add( StringUtils.join(tableDefinition.getPartitionKeys(), COMMA) ).add(PAREN_RIGHT);
+
+ if ( tableDefinition.getColumnKeys() != null && !tableDefinition.getColumnKeys().isEmpty() ){
+
+ cql.add(COMMA).add( StringUtils.join(tableDefinition.getColumnKeys(), COMMA) );
+ }
+
+ cql.add(PAREN_RIGHT).add(PAREN_RIGHT)
+ .add(WITH)
+ .add(CLUSTERING_ORDER_BY)
+ .add(PAREN_LEFT)
+ .add( spaceSeparatedKeyValue(tableDefinition.getClusteringOrder()) )
+ .add(PAREN_RIGHT)
+ .add(AND)
+ .add(COMPACT_STORAGE)
+ .add(AND);
+
+ } else if ( tableAction.equals(ACTION.UPDATE) ){
+ cql.add(WITH);
+
+ }
+
+
+ cql.add(COMPACTION).add(EQUAL).add( getMapAsCQLString( tableDefinition.getCompaction() ) )
+ .add(AND)
+ .add(COMPRESSION).add(EQUAL).add( getMapAsCQLString( tableDefinition.getCompression() ) )
+ .add(AND)
+ .add(GC_GRACE_SECONDS).add(EQUAL).add( tableDefinition.getGcGraceSeconds() )
+ .add(AND)
+ .add(CACHING).add(EQUAL).add( getCachingOptions( cassandraFig, tableDefinition.getCacheOption() ) );
+
+ return cql.toString();
+
+ }
+
+ public static String quote( String value){
+
+ return "\"" + value + "\"";
+
+ }
+
+ public static String spaceSeparatedKeyValue(Map<String, ?> columns){
+
+ StringJoiner columnsSchema = new StringJoiner(",");
+ columns.forEach( (key, value) -> {
+
+ if( value == DataType.Name.CUSTOM ){
+ columnsSchema.add(key+" "+COMPOSITE_TYPE);
+ }else {
+ columnsSchema.add(key + " " + String.valueOf(value));
+ }
+ });
+
+ return columnsSchema.toString();
+
+ }
+
+
+ public static String getCachingOptions(CassandraFig cassandraFig, TableDefinition.CacheOption cacheOption) throws JsonProcessingException {
+
+ // Cassandra 2.0 and below has a different CQL syntax for caching
+ if( Double.parseDouble( cassandraFig.getVersion() ) <= 2.0 ){
+
+ return quote( getLegacyCacheValue( cacheOption ) );
+
+ } else {
+
+ return getCacheValue( cacheOption );
+ }
+
+ }
+
+
+ public static String getCacheValue( TableDefinition.CacheOption cacheOption ) throws JsonProcessingException {
+
+
+ Map<String, Object> cacheValue = new HashMap<>(2);
+ switch (cacheOption) {
+
+ case ALL:
+ cacheValue.put("keys", "ALL");
+ cacheValue.put("rows_per_partition", "ALL");
+ break;
+
+ case KEYS:
+ cacheValue.put("keys", "ALL");
+ cacheValue.put("rows_per_partition", "NONE");
+ break;
+
+ case ROWS:
+ cacheValue.put("keys", "NONE");
+ cacheValue.put("rows_per_partition", "ALL");
+ break;
+
+ case NONE:
+ cacheValue.put("keys", "NONE");
+ cacheValue.put("rows_per_partition", "NONE");
+ break;
+ default:
+ cacheValue.put("keys", "NONE");
+ cacheValue.put("rows_per_partition", "NONE");
+ break;
+
+ }
+
+ return getMapAsCQLString( cacheValue );
+
+ }
+
+ public static String getLegacyCacheValue( TableDefinition.CacheOption cacheOption ){
+
+ String cacheValue;
+ switch (cacheOption) {
+
+ case ALL:
+ cacheValue = "all";
+ break;
+
+ case KEYS:
+ cacheValue = "keys_only";
+ break;
+
+ case ROWS:
+ cacheValue = "rows_only";
+ break;
+
+ case NONE:
+ cacheValue = "none";
+ break;
+ default:
+ cacheValue = "none";
+ break;
+
+ }
+
+ return cacheValue;
+
+ }
+
+
+ /**
+ * Below functions borrowed from Astyanax until the schema is re-written to be more CQL friendly
+ */
+
+ public static int getShortLength(ByteBuffer bb) {
+ int length = (bb.get() & 255) << 8;
+ return length | bb.get() & 255;
+ }
+
+ public static ByteBuffer getBytes(ByteBuffer bb, int length) {
+ ByteBuffer copy = bb.duplicate();
+ copy.limit(copy.position() + length);
+ bb.position(bb.position() + length);
+ return copy;
+ }
+
+ public static ByteBuffer getWithShortLength(ByteBuffer bb) {
+ int length = getShortLength(bb);
+ return getBytes(bb, length);
+ }
+
+
+}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
new file mode 100644
index 0000000..768a7a2
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxCluster.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+
+public interface DataStaxCluster {
+
+ Cluster getCluster();
+
+ Session getClusterSession();
+
+ Session getApplicationSession();
+
+ void createOrUpdateKeyspace() throws Exception;
+
+ void waitForSchemaAgreement();
+
+}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxSessionProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxSessionProvider.java
new file mode 100644
index 0000000..5e9a633
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DataStaxSessionProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+public class DataStaxSessionProvider implements Provider<Session> {
+
+ private final DataStaxCluster dataStaxCluster;
+
+ @Inject
+ public DataStaxSessionProvider( final DataStaxCluster dataStaxCluster ){
+
+ this.dataStaxCluster = dataStaxCluster;
+ }
+
+ @Override
+ public Session get(){
+
+ return dataStaxCluster.getApplicationSession();
+ }
+}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
new file mode 100644
index 0000000..d749d40
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.datastax;
+
+
+import com.datastax.driver.core.DataType;
+import com.google.common.base.Preconditions;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TableDefinition {
+
+
+ public enum CacheOption {
+
+ ALL, KEYS, ROWS, NONE
+ }
+
+
+ private final String tableName;
+ private final Collection<String> partitionKeys;
+ private final Collection<String> columnKeys;
+ private final Map<String, DataType.Name> columns;
+ private final CacheOption cacheOption;
+ private final Map<String, Object> compaction;
+ private final String bloomFilterChance;
+ private final String readRepairChance;
+ private final Map<String, Object> compression;
+ private final String gcGraceSeconds;
+ private final Map<String, String> clusteringOrder;
+
+ public TableDefinition( final String tableName, final Collection<String> partitionKeys,
+ final Collection<String> columnKeys, final Map<String, DataType.Name> columns,
+ final CacheOption cacheOption, final Map<String, String> clusteringOrder){
+
+ Preconditions.checkNotNull(tableName, "Table name cannot be null");
+ Preconditions.checkNotNull(partitionKeys, "Primary Key(s) cannot be null");
+ Preconditions.checkNotNull(columns, "Columns cannot be null");
+ Preconditions.checkNotNull(cacheOption, "CacheOption cannot be null");
+
+
+ this.tableName = tableName;
+ this.partitionKeys = partitionKeys;
+ this.columnKeys = columnKeys;
+ this.columns = columns;
+ this.cacheOption = cacheOption;
+ this.clusteringOrder = clusteringOrder;
+
+
+ // this are default settings always used
+ this.compaction = new HashMap<>(1);
+ compaction.put( "class", "LeveledCompactionStrategy" );
+ this.bloomFilterChance = "0.1d";
+ this.readRepairChance = "0.1d";
+ this.compression = new HashMap<>(1);
+ compression.put("sstable_compression", "LZ4Compressor");
+ this.gcGraceSeconds = "864000";
+
+
+
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Collection<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ public Collection<String> getColumnKeys() {
+ return columnKeys;
+ }
+
+ public Map<String, DataType.Name> getColumns() {
+ return columns;
+ }
+
+ public CacheOption getCacheOption() {
+ return cacheOption;
+ }
+
+ public Map<String, Object> getCompaction() {
+ return compaction;
+ }
+
+ public String getBloomFilterChance() {
+ return bloomFilterChance;
+ }
+
+ public String getReadRepairChance() {
+ return readRepairChance;
+ }
+
+ public Map<String, Object> getCompression() {
+ return compression;
+ }
+
+ public String getGcGraceSeconds() {
+ return gcGraceSeconds;
+ }
+
+ public Map<String, String> getClusteringOrder() {
+ return clusteringOrder;
+ }
+
+
+}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
new file mode 100644
index 0000000..c7b736f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax.impl;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Singleton
+public class DataStaxClusterImpl implements DataStaxCluster {
+
+ private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class );
+
+
+ private final CassandraFig cassandraFig;
+ private Cluster cluster;
+ private Session applicationSession;
+ private Session clusterSession;
+
+ @Inject
+ public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception {
+ this.cassandraFig = cassandraFig;
+ this.cluster = buildCluster();
+
+ // always initialize the keyspaces
+ this.createOrUpdateKeyspace();
+
+ logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s, Pool Timeout={}s",
+ cluster.getMetadata().getAllHosts().toString(),
+ cluster.getConfiguration().getPoolingOptions().getIdleTimeoutSeconds(),
+ cluster.getConfiguration().getPoolingOptions().getPoolTimeoutMillis() / 1000);
+
+
+ }
+
+ @Override
+ public Cluster getCluster(){
+
+ // ensure we can build the cluster if it was previously closed
+ if ( cluster.isClosed() ){
+ cluster = buildCluster();
+ }
+
+ return cluster;
+ }
+
+ @Override
+ public Session getClusterSession(){
+
+ // always grab cluster from getCluster() in case it was prematurely closed
+ if ( clusterSession == null || clusterSession.isClosed() ){
+ clusterSession = getCluster().connect();
+ }
+
+ return clusterSession;
+ }
+
+ @Override
+ public Session getApplicationSession(){
+
+ // always grab cluster from getCluster() in case it was prematurely closed
+ if ( applicationSession == null || applicationSession.isClosed() ){
+ applicationSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) );
+ }
+ return applicationSession;
+ }
+
+
+ /**
+ * Execute CQL that will create the keyspace if it doesn't exist and alter it if it does.
+ * @throws Exception
+ */
+ @Override
+ public void createOrUpdateKeyspace() throws Exception {
+
+ clusterSession = getClusterSession();
+
+ final String createApplicationKeyspace = String.format(
+ "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
+ CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
+ CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() )
+
+ );
+
+ final String updateApplicationKeyspace = String.format(
+ "ALTER KEYSPACE %s WITH replication = %s",
+ CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
+ CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions() )
+ );
+
+ clusterSession.execute(createApplicationKeyspace);
+ clusterSession.executeAsync(updateApplicationKeyspace);
+
+ logger.info("Created/Updated keyspace: {}", cassandraFig.getApplicationKeyspace());
+
+ waitForSchemaAgreement();
+ }
+
+ /**
+ * Wait until all Cassandra nodes agree on the schema. Sleeps 100ms between checks.
+ *
+ */
+ public void waitForSchemaAgreement() {
+
+ while ( true ) {
+
+ if( this.cluster.getMetadata().checkSchemaAgreement() ){
+ return;
+ }
+
+ //sleep and try it again
+ try {
+ Thread.sleep( 100 );
+ }
+ catch ( InterruptedException e ) {
+ //swallow
+ }
+ }
+ }
+
+ public Cluster buildCluster(){
+
+ ConsistencyLevel defaultConsistencyLevel;
+ try {
+ defaultConsistencyLevel = ConsistencyLevel.valueOf(cassandraFig.getReadCl());
+ } catch (IllegalArgumentException e){
+
+ logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}",
+ CassandraFig.READ_CL,
+ ConsistencyLevel.LOCAL_QUORUM);
+
+ defaultConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
+ }
+
+
+ LoadBalancingPolicy loadBalancingPolicy;
+ if( !cassandraFig.getLocalDataCenter().isEmpty() ){
+
+ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder()
+ .withLocalDc( cassandraFig.getLocalDataCenter() ).build();
+ }else{
+ loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build();
+ }
+
+ final PoolingOptions poolingOptions = new PoolingOptions()
+ .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections() / 2)
+ .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections())
+ .setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000)
+ .setPoolTimeoutMillis(cassandraFig.getPoolTimeout());
+
+ final QueryOptions queryOptions = new QueryOptions()
+ .setConsistencyLevel(defaultConsistencyLevel);
+
+ Cluster.Builder datastaxCluster = Cluster.builder()
+ .withClusterName(cassandraFig.getClusterName())
+ .addContactPoints(cassandraFig.getHosts().split(","))
+ .withCompression(ProtocolOptions.Compression.LZ4)
+ .withLoadBalancingPolicy(loadBalancingPolicy)
+ .withPoolingOptions(poolingOptions)
+ .withQueryOptions(queryOptions)
+ .withProtocolVersion(getProtocolVersion(cassandraFig.getVersion()));
+
+ // only add auth credentials if they were provided
+ if ( !cassandraFig.getUsername().isEmpty() && !cassandraFig.getPassword().isEmpty() ){
+ datastaxCluster.withCredentials(
+ cassandraFig.getUsername(),
+ cassandraFig.getPassword()
+ );
+ }
+
+
+ return datastaxCluster.build();
+
+ }
+
+ private ProtocolVersion getProtocolVersion(String versionNumber){
+
+ ProtocolVersion protocolVersion;
+ switch (versionNumber) {
+
+ case "2.1":
+ protocolVersion = ProtocolVersion.V3;
+ break;
+ case "2.0":
+ protocolVersion = ProtocolVersion.V2;
+ break;
+ case "1.2":
+ protocolVersion = ProtocolVersion.V1;
+ break;
+ default:
+ protocolVersion = ProtocolVersion.NEWEST_SUPPORTED;
+ break;
+
+ }
+
+ return protocolVersion;
+
+
+ }
+
+}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index f7a9926..4681674 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,8 +19,15 @@
package org.apache.usergrid.persistence.core.guice;
+import com.datastax.driver.core.Session;
import com.netflix.astyanax.Keyspace;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfigImpl;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
+import org.apache.usergrid.persistence.core.datastax.DataStaxSessionProvider;
+import org.apache.usergrid.persistence.core.datastax.impl.DataStaxClusterImpl;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.consistency.TimeService;
@@ -61,6 +68,12 @@
// bind our Cassandra cluster to the Astyanax Implementation
bind(CassandraCluster.class).to(CassandraClusterImpl.class).asEagerSingleton();
+ // bind our Datastax cluster
+ bind(DataStaxCluster.class).to(DataStaxClusterImpl.class).asEagerSingleton();
+
+ // bind our Session to the DataStaxSessionProvider
+ bind(Session.class).toProvider(DataStaxSessionProvider.class).asEagerSingleton();
+
// bind our keyspace to the AstyanaxKeyspaceProvider
bind(Keyspace.class).toProvider(AstyanaxKeyspaceProvider.class).asEagerSingleton();
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index e02cafe..18427f7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -27,6 +27,7 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -205,4 +206,10 @@
UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
MultiTenantColumnFamilyDefinition.CacheOption.KEYS ) );
}
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
index 9938b88..952f898 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/Migration.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
/**
@@ -33,4 +34,6 @@
* Get the column families required for this implementation. If one does not exist it will be created.
*/
Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies();
+
+ Collection<TableDefinition> getTables();
}
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
index db694fe..f5f5d7b 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
@@ -20,28 +20,27 @@
import java.util.Collection;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
+import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
-import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
/**
- * Implementation of the migration manager to set up keyspace
+ * Implementation of the migration manager to set up column families / tables
*
* @author tnine
*/
@@ -51,44 +50,61 @@
private static final Logger logger = LoggerFactory.getLogger( MigrationManagerImpl.class );
+ private final CassandraFig cassandraFig;
private final Set<Migration> migrations;
private final Keyspace keyspace;
-
- private final MigrationManagerFig fig;
+ private final DataStaxCluster dataStaxCluster;
@Inject
- public MigrationManagerImpl( final Keyspace keyspace, final Set<Migration> migrations,
- MigrationManagerFig fig ) {
+ public MigrationManagerImpl( final CassandraFig cassandraFig, final Keyspace keyspace,
+ final Set<Migration> migrations, final DataStaxCluster dataStaxCluster) {
+
+ this.cassandraFig = cassandraFig;
this.keyspace = keyspace;
this.migrations = migrations;
- this.fig = fig;
+ this.dataStaxCluster = dataStaxCluster;
}
@Override
public void migrate() throws MigrationException {
-
try {
- testAndCreateKeyspace();
+ dataStaxCluster.createOrUpdateKeyspace();
for ( Migration migration : migrations ) {
final Collection<MultiTenantColumnFamilyDefinition> columnFamilies = migration.getColumnFamilies();
+ final Collection<TableDefinition> tables = migration.getTables();
- if ( columnFamilies == null || columnFamilies.size() == 0 ) {
+
+ if ((columnFamilies == null || columnFamilies.size() == 0) &&
+ (tables == null || tables.size() == 0)) {
logger.warn(
- "Class {} implements {} but returns null column families for migration. Either implement this method or remove the interface from the class",
- migration.getClass(), Migration.class );
+ "Class {} implements {} but returns null for getColumnFamilies and getTables for migration. Either implement this method or remove the interface from the class",
+ migration.getClass().getSimpleName(), Migration.class.getSimpleName());
continue;
}
- for ( MultiTenantColumnFamilyDefinition cf : columnFamilies ) {
- testAndCreateColumnFamilyDef( cf );
+ if (columnFamilies != null && !columnFamilies.isEmpty()) {
+ for (MultiTenantColumnFamilyDefinition cf : columnFamilies) {
+ testAndCreateColumnFamilyDef(cf);
+ }
}
+
+
+ if ( tables != null && !tables.isEmpty() ) {
+ for (TableDefinition tableDefinition : tables) {
+
+ createTable(tableDefinition);
+
+ }
+ }
+
+
}
}
catch ( Throwable t ) {
@@ -116,96 +132,23 @@
logger.info( "Created column family {}", columnFamily.getColumnFamily().getName() );
- waitForMigration();
+ dataStaxCluster.waitForSchemaAgreement();
+ }
+
+ private void createTable(TableDefinition tableDefinition ) throws Exception {
+
+ String CQL = CQLUtils.getTableCQL( cassandraFig, tableDefinition, CQLUtils.ACTION.CREATE );
+ if (logger.isDebugEnabled()){
+ logger.debug( CQL );
+ }
+ dataStaxCluster.getApplicationSession()
+ .execute( CQL );
+
+ logger.info("Created table: {}", tableDefinition.getTableName());
+
+ dataStaxCluster.waitForSchemaAgreement();
}
- /**
- * Check if they keyspace exists. If it doesn't create it
- */
- private void testAndCreateKeyspace() throws ConnectionException {
-
- KeyspaceDefinition keyspaceDefinition = null;
-
- try {
- keyspaceDefinition = keyspace.describeKeyspace();
-
- }catch( NotFoundException nfe){
- //if we execute this immediately after a drop keyspace in 1.2.x, Cassandra is returning the NFE instead of a BadRequestException
- //swallow and log, then continue to create the keyspaces.
- logger.info( "Received a NotFoundException when attempting to describe keyspace. It does not exist" );
- }
- catch(Exception e){
- AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra", e);
- }
-
-
- if ( keyspaceDefinition != null ) {
- return;
- }
-
-
- ImmutableMap.Builder<String, Object> strategyOptions = getKeySpaceProps();
-
-
- ImmutableMap<String, Object> options =
- ImmutableMap.<String, Object>builder().put( "strategy_class", fig.getStrategyClass() )
- .put( "strategy_options", strategyOptions.build() ).build();
-
-
- keyspace.createKeyspace( options );
-
- strategyOptions.toString();
-
- logger.info( "Created keyspace {} with options {}", keyspace.getKeyspaceName(), options.toString() );
-
- waitForMigration();
- }
-
-
- /**
- * Get keyspace properties
- */
- private ImmutableMap.Builder<String, Object> getKeySpaceProps() {
- ImmutableMap.Builder<String, Object> keyspaceProps = ImmutableMap.<String, Object>builder();
-
- String optionString = fig.getStrategyOptions();
-
- if(optionString == null){
- return keyspaceProps;
- }
-
-
-
- for ( String key : optionString.split( "," ) ) {
-
- final String[] options = key.split( ":" );
-
- keyspaceProps.put( options[0], options[1] );
- }
-
- return keyspaceProps;
- }
-
-
- private void waitForMigration() throws ConnectionException {
-
- while ( true ) {
-
- final Map<String, List<String>> versions = keyspace.describeSchemaVersions();
-
- if ( versions != null && versions.size() == 1 ) {
- return;
- }
-
- //sleep and try it again
- try {
- Thread.sleep( 100 );
- }
- catch ( InterruptedException e ) {
- //swallow
- }
- }
- }
}
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index 18c9327..caa6294 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -24,8 +24,9 @@
import java.util.HashMap;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -93,6 +94,22 @@
return ConsistencyLevel.CL_QUORUM;
}
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+ return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+ }
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+ return com.datastax.driver.core.ConsistencyLevel.ALL;
+ }
+
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+ return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+ }
+
@Override
public int[] getShardSettings() {
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index bd1ea55..b31fa2f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -26,8 +26,9 @@
import java.util.Comparator;
import java.util.HashMap;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,8 +51,6 @@
import com.netflix.astyanax.util.RangeBuilder;
import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -98,6 +97,22 @@
return ConsistencyLevel.CL_QUORUM;
}
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+ return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+ }
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+ return com.datastax.driver.core.ConsistencyLevel.ALL;
+ }
+
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+ return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+ }
+
@Override
public int[] getShardSettings() {
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index b6ee7fe..ea5359e 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -28,6 +28,8 @@
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -99,6 +101,22 @@
return ConsistencyLevel.CL_QUORUM;
}
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() {
+ return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE;
+ }
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() {
+ return com.datastax.driver.core.ConsistencyLevel.ALL;
+ }
+
+
+ @Override
+ public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() {
+ return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+ }
+
@Override
public int[] getShardSettings() {
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
new file mode 100644
index 0000000..34dd370
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+
+import com.datastax.driver.core.DataType;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.guice.TestCommonModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith( ITRunner.class )
+@UseModules( TestCommonModule.class )
+public class CQLUtilsTest {
+
+ private static final Logger logger = LoggerFactory.getLogger( CQLUtilsTest.class );
+
+ @Inject
+ CassandraFig cassandraFig;
+
+
+ @Test
+ public void testTableCQL() throws Exception {
+
+
+ Map<String, DataType.Name> columns = new HashMap<>();
+ columns.put("key", DataType.Name.BLOB);
+ columns.put("column1", DataType.Name.TEXT);
+ columns.put("value", DataType.Name.BLOB);
+
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("key");
+
+ List<String> columnKeys = new ArrayList<>();
+ columnKeys.add("column1");
+
+ Map<String, String> clusteringOrder = new HashMap<>();
+ clusteringOrder.put("column1", "DESC");
+
+
+
+ TableDefinition table1 = new TableDefinition(
+ CQLUtils.quote("table1"),
+ partitionKeys,
+ columnKeys,
+ columns,
+ TableDefinition.CacheOption.KEYS,
+ clusteringOrder
+ );
+
+ String createCQL = CQLUtils.getTableCQL(cassandraFig, table1, CQLUtils.ACTION.CREATE);
+ String updateCQL = CQLUtils.getTableCQL(cassandraFig, table1, CQLUtils.ACTION.UPDATE);
+
+ assertTrue(
+ createCQL.contains(CQLUtils.CREATE_TABLE ) &&
+ !createCQL.contains( CQLUtils.ALTER_TABLE ) &&
+ createCQL.contains( DataType.Name.BLOB.toString() ) &&
+ createCQL.contains( DataType.Name.TEXT.toString() )
+
+ );
+ assertTrue(
+ updateCQL.contains( CQLUtils.ALTER_TABLE ) &&
+ !updateCQL.contains( CQLUtils.CREATE_TABLE ) &&
+ !updateCQL.contains( DataType.Name.BLOB.toString() ) &&
+ !updateCQL.contains( DataType.Name.TEXT.toString() )
+ );
+ logger.info(createCQL);
+ logger.info(updateCQL);
+
+ }
+
+ @Test
+ public void testLegacyCachingOptions() throws Exception{
+
+ final CassandraFig cassandraFig = mock(CassandraFig.class);
+ when(cassandraFig.getVersion()).thenReturn("2.0");
+
+ Map<String, DataType.Name> columns = new HashMap<>();
+ columns.put("key", DataType.Name.BLOB);
+ columns.put("column1", DataType.Name.TEXT);
+ columns.put("value", DataType.Name.BLOB);
+
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("key");
+
+ List<String> columnKeys = new ArrayList<>();
+ columnKeys.add("column1");
+
+ Map<String, String> clusteringOrder = new HashMap<>();
+ clusteringOrder.put("column1", "DESC");
+
+
+
+ TableDefinition table1 = new TableDefinition(
+ CQLUtils.quote("table1"),
+ partitionKeys,
+ columnKeys,
+ columns,
+ TableDefinition.CacheOption.KEYS,
+ clusteringOrder
+ );
+
+ String createCQL = CQLUtils.getTableCQL(cassandraFig, table1, CQLUtils.ACTION.CREATE);
+ logger.info(createCQL);
+ assertTrue(
+ createCQL.contains( "\"keys_only\"" ) &&
+ !createCQL.contains( "'keys':'ALL'" )
+
+ );
+
+
+
+ }
+
+ @Test
+ public void testCachingOptions() throws Exception {
+
+ final CassandraFig cassandraFig = mock(CassandraFig.class);
+ when(cassandraFig.getVersion()).thenReturn("2.1");
+
+ Map<String, DataType.Name> columns = new HashMap<>();
+ columns.put("key", DataType.Name.BLOB);
+ columns.put("column1", DataType.Name.TEXT);
+ columns.put("value", DataType.Name.BLOB);
+
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add("key");
+
+ List<String> columnKeys = new ArrayList<>();
+ columnKeys.add("column1");
+
+ Map<String, String> clusteringOrder = new HashMap<>();
+ clusteringOrder.put("column1", "DESC");
+
+
+
+ TableDefinition table1 = new TableDefinition(
+ CQLUtils.quote("table1"),
+ partitionKeys,
+ columnKeys,
+ columns,
+ TableDefinition.CacheOption.KEYS,
+ clusteringOrder
+ );
+
+ String createCQL = CQLUtils.getTableCQL(cassandraFig, table1, CQLUtils.ACTION.CREATE);
+ logger.info(createCQL);
+ assertTrue(
+ createCQL.contains( "'keys':'ALL'" ) &&
+ !createCQL.contains( "\"keys_only\"" )
+
+ );
+
+
+ }
+
+}
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/DataStaxClusterTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/DataStaxClusterTest.java
new file mode 100644
index 0000000..227f11d
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/DataStaxClusterTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.guice.TestCommonModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.*;
+
+@RunWith( ITRunner.class )
+@UseModules( TestCommonModule.class )
+public class DataStaxClusterTest {
+
+
+ @Inject
+ DataStaxCluster dataStaxCluster;
+
+ @Inject
+ CassandraFig cassandraFig;
+
+
+ @Test
+ public void testConnectCloseCluster() {
+
+ Cluster cluster = dataStaxCluster.getCluster();
+
+ assertTrue(!cluster.isClosed());
+
+ cluster.close();
+ assertTrue(cluster.isClosed());
+
+ // validate getCluster will re-init the cluster
+ cluster = dataStaxCluster.getCluster();
+ assertTrue(!cluster.isClosed());
+
+
+ }
+
+ @Test
+ public void testGetClusterSession() {
+
+ Session session = dataStaxCluster.getClusterSession();
+ String clusterName = session.getCluster().getClusterName();
+ String keyspaceName = session.getLoggedKeyspace();
+
+ // cluster session is not logged to a keyspace
+ assertNull(keyspaceName);
+ assertNotNull(clusterName);
+ }
+
+ @Test
+ public void testGetApplicationSession() {
+
+ Session session = dataStaxCluster.getApplicationSession();
+ String keyspaceName = session.getLoggedKeyspace();
+
+
+ assertEquals(cassandraFig.getApplicationKeyspace(), keyspaceName);
+ }
+
+}
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
new file mode 100644
index 0000000..3acce69
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class TableDefinitionTest {
+
+ @Test
+ public void testNullTableName(){
+
+ try{
+ TableDefinition table1 = new TableDefinition(null, null, null, null, null, null);
+ } catch (NullPointerException npe){
+ assertEquals("Table name cannot be null", npe.getMessage());
+ }
+
+
+ }
+
+ @Test
+ public void testNullPrimaryKeys(){
+
+ try{
+ TableDefinition table1 = new TableDefinition("table1", null, null, null, null, null);
+ } catch (NullPointerException npe){
+ assertEquals("Primary Key(s) cannot be null", npe.getMessage());
+ }
+
+
+ }
+
+ @Test
+ public void testNullColumns(){
+
+ try{
+ TableDefinition table1 = new TableDefinition("table1",
+ new ArrayList<>(), null, null, null, null);
+ } catch (NullPointerException npe){
+ assertEquals("Columns cannot be null", npe.getMessage());
+ }
+
+
+ }
+
+ @Test
+ public void testNullCacheOption(){
+
+ try{
+ TableDefinition table1 = new TableDefinition("table1",
+ new ArrayList<>(),
+ new ArrayList<>(), new HashMap<>(), null, null);
+ } catch (NullPointerException npe){
+ assertEquals("CacheOption cannot be null", npe.getMessage());
+ }
+
+
+ }
+}
diff --git a/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties
index c94ea57..f9e27f7 100644
--- a/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/common/src/test/resources/usergrid-UNIT.properties
@@ -20,7 +20,6 @@
# Keep nothing but overriding test defaults in here
cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index 44d0b73..46acd1c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Iterator;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,6 +289,11 @@
return Collections.EMPTY_LIST;
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
index da4d044..d7a5c80 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV1Impl.java
@@ -22,12 +22,13 @@
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
@@ -36,6 +37,7 @@
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.StringColumnParser;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -376,6 +378,12 @@
graphCf( CF_SOURCE_EDGE_ID_TYPES ), graphCf( CF_TARGET_EDGE_ID_TYPES ) );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
/**
* Helper to generate an edge definition by the type
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 9b0257f..1f81864 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -22,19 +22,14 @@
package org.apache.usergrid.persistence.graph.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
@@ -42,6 +37,7 @@
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.StringColumnParser;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
@@ -470,6 +466,12 @@
graphCf( CF_SOURCE_EDGE_ID_TYPES ), graphCf( CF_TARGET_EDGE_ID_TYPES ) );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
/**
* Helper to generate an edge definition by the type
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 0f4d722..984365f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -27,7 +27,7 @@
import javax.inject.Inject;
import com.google.common.base.Optional;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index cd803e8..2429d5a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -32,12 +32,13 @@
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -110,6 +111,12 @@
MultiTenantColumnFamilyDefinition.CacheOption.ALL ) );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
@Override
public MutationBatch mark( final ApplicationScope scope, final Id node, final long timestamp ) {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 13ca68f..098c152 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -24,10 +24,9 @@
import java.util.Collections;
import java.util.Iterator;
-import com.google.common.util.concurrent.ExecutionError;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
@@ -35,12 +34,12 @@
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer;
@@ -188,6 +187,12 @@
MultiTenantColumnFamilyDefinition.CacheOption.KEYS ) );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 65a6f40..8259df4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -30,7 +30,7 @@
import javax.inject.Inject;
import com.google.common.base.Optional;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.consistency.TimeService;
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
index 9185ac8..f4e19d6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.DynamicCompositeType;
@@ -32,6 +33,7 @@
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
@@ -143,6 +145,12 @@
MultiTenantColumnFamilyDefinition.CacheOption.ALL ) );
}
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ return Collections.emptyList();
+ }
+
/**
* Helper to generate an edge definition by the type
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
index e19f676..e737815 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
@@ -1,12 +1,10 @@
# Keep nothing but overriding test defaults in here
-cassandra.connections=100
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=
#cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
collections.keyspace.strategy.options=us-east:3
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
index 78e3400..92d0041 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
@@ -1,15 +1,13 @@
# These are for CHOP environment settings
-cassandra.connections=20
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
# a comma delimited private IP address list to your chop cassandra cluster
# define this in your settings.xml and have it as an always active profile
cassandra.hosts=${chop.cassandra.hosts}
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
# This property is required to be set and cannot be defaulted anywhere
usergrid.cluster_name=usergrid
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index eb06001..6f8a7c5 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -1,11 +1,9 @@
# Keep nothing but overriding test defaults in here
-cassandra.connections=40
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=2000
cassandra.embedded=true
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index 965f5f9..cafa4a1 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -57,7 +57,6 @@
return mapSerialization.getString( scope, key );
}
-
@Override
public String getStringHighConsistency( final String key ) {
return mapSerialization.getStringHighConsistency(scope, key);
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index f90f80c..735f2b8 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -20,25 +20,18 @@
package org.apache.usergrid.persistence.map.impl;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.nio.ByteBuffer;
+import java.util.*;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
import org.apache.usergrid.persistence.core.shard.StringHashUtils;
import org.apache.usergrid.persistence.map.MapScope;
@@ -47,57 +40,42 @@
import com.google.common.hash.Funnel;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
-import com.netflix.astyanax.model.Rows;
-import com.netflix.astyanax.serializers.BooleanSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
@Singleton
public class MapSerializationImpl implements MapSerialization {
- private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
- private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
- new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
+ private static final String MAP_ENTRIES_TABLE = CQLUtils.quote("Map_Entries");
+ private static final Collection<String> MAP_ENTRIES_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> MAP_ENTRIES_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> MAP_ENTRIES_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> MAP_ENTRIES_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
- private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
- private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
- new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
+ private static final String MAP_KEYS_TABLE = CQLUtils.quote("Map_Keys");
+ private static final Collection<String> MAP_KEYS_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> MAP_KEYS_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> MAP_KEYS_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> MAP_KEYS_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
- private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
-
- private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
- private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+ private static final StringResultsBuilderCQL STRING_RESULTS_BUILDER_CQL = new StringResultsBuilderCQL();
/**
- * CFs where the row key contains the source node id
- */
- public static final MultiTenantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
- new MultiTenantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
-
-
- /**
- * CFs where the row key contains the source node id
- */
- public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
- new MultiTenantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
-
- /**
* Number of buckets to hash across.
*/
private static final int[] NUM_BUCKETS = { 20 };
@@ -114,140 +92,111 @@
private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
- private final Keyspace keyspace;
private final CassandraConfig cassandraConfig;
+ private final Session session;
+
@Inject
- public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) {
- this.keyspace = keyspace;
+ public MapSerializationImpl( final CassandraConfig cassandraConfig, final Session session ) {
+ this.session = session;
this.cassandraConfig = cassandraConfig;
}
@Override
public String getString( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
- return ( col != null ) ? col.getStringValue() : null;
+
+ ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() ) ;
+ return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null;
}
@Override
public String getStringHighConsistency( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
- return ( col != null ) ? col.getStringValue() : null;
+
+ ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadConsistentCl() ) ;
+ return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null;
}
@Override
public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
- return getValues( scope, keys, STRING_RESULTS_BUILDER );
+ return getValuesCQL( scope, keys, STRING_RESULTS_BUILDER_CQL );
}
@Override
public void putString( final MapScope scope, final String key, final String value ) {
- final RowOp op = new RowOp() {
- @Override
- public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
- columnListMutation.putColumn( true, value );
- }
-
- @Override
- public void putKey( final ColumnListMutation<String> keysMutation ) {
- keysMutation.putColumn( key, true );
- }
- };
-
-
- writeString( scope, key, value, op );
+ writeStringCQL( scope, key, value, -1 );
}
@Override
public void putString( final MapScope scope, final String key, final String value, final int ttl ) {
+
Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" );
-
- final RowOp op = new RowOp() {
- @Override
- public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
- columnListMutation.putColumn( true, value, ttl );
- }
-
-
- @Override
- public void putKey( final ColumnListMutation<String> keysMutation ) {
- keysMutation.putColumn( key, true, ttl );
- }
- };
-
-
- writeString( scope, key, value, op );
+ writeStringCQL( scope, key, value, ttl );
}
/**
* Write our string index with the specified row op
*/
- private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
+ private void writeStringCQL( final MapScope scope, final String key, final String value, int ttl ) {
Preconditions.checkNotNull( scope, "mapscope is required" );
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ Statement mapEntry;
+ Statement mapKey;
+ if (ttl > 0){
+ Using timeToLive = QueryBuilder.ttl(ttl);
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
-
- //serialize to the
- // entry
+ mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+ .using(timeToLive)
+ .value("key", getMapEntryPartitionKey(scope, key))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
- rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) );
+ final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+ mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+ .using(timeToLive)
+ .value("key", getMapKeyPartitionKey(scope, key, bucket))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+ .value("key", getMapEntryPartitionKey(scope, key))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
- //add it to the keys
+ // get a bucket number for the map keys table
+ final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
- final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+ mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+ .value("key", getMapKeyPartitionKey(scope, key, bucket))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
- final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
+ }
- //serialize to the entry
+ session.execute(mapEntry);
+ session.execute(mapKey);
- rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) );
-
-
- executeBatch( batch );
}
- /**
- * Callbacks for performing row operations
- */
- private static interface RowOp {
-
- /**
- * Callback to do the row
- *
- * @param columnListMutation The column mutation
- */
- void putValue( final ColumnListMutation<Boolean> columnListMutation );
-
-
- /**
- * Write the key
- */
- void putKey( final ColumnListMutation<String> keysMutation );
- }
-
@Override
public UUID getUuid( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
- return ( col != null ) ? col.getUUIDValue() : null;
+ ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() );
+ return value != null ? (UUID)DataType.uuid().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null;
}
@@ -258,31 +207,34 @@
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( putUuid, "value is required" );
- final MutationBatch batch = keyspace.prepareMutationBatch();
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+ Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+ .value("key", getMapEntryPartitionKey(scope, key))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.uuid().serialize(putUuid, ProtocolVersion.NEWEST_SUPPORTED));
- //serialize to the entry
- batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
+ session.execute(mapEntry);
- //add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+ Statement mapKey;
+ mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+ .value("key", getMapKeyPartitionKey(scope, key, bucket))
+ .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
- final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
-
- //serialize to the entry
- batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
-
- executeBatch( batch );
+ session.execute(mapKey);
}
+
+
+
@Override
public Long getLong( final MapScope scope, final String key ) {
- Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
- return ( col != null ) ? col.getLongValue() : null;
+
+ ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl());
+ return value != null ? (Long)DataType.bigint().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null;
}
@@ -293,235 +245,221 @@
Preconditions.checkNotNull( key, "key is required" );
Preconditions.checkNotNull( value, "value is required" );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+ .value("key", getMapEntryPartitionKey(scope, key))
+ .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.bigint().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+ session.execute(mapEntry);
- //serialize to the entry
- batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
- //add it to the keys
final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
+ Statement mapKey;
+ mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+ .value("key", getMapKeyPartitionKey(scope, key, bucket))
+ .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
+ .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
- final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
-
- //serialize to the entry
- batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
-
- executeBatch( batch );
+ session.execute(mapKey);
}
@Override
public void delete( final MapScope scope, final String key ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
- //serialize to the entry
- batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
+ Statement deleteMapEntry;
+ Clause equalsEntryKey = QueryBuilder.eq("key", getMapEntryPartitionKey(scope, key));
+ deleteMapEntry = QueryBuilder.delete().from(MAP_ENTRIES_TABLE)
+ .where(equalsEntryKey);
+ session.execute(deleteMapEntry);
- //add it to the keys, we're not sure which one it may have come from
+
+
+ // not sure which bucket the value is in, execute a delete against them all
final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
-
-
- final List<BucketScopedRowKey<String>> rowKeys =
- BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
-
- for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
- batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
+ List<ByteBuffer> mapKeys = new ArrayList<>();
+ for( int bucket : buckets){
+ mapKeys.add( getMapKeyPartitionKey(scope, key, bucket));
}
- executeBatch( batch );
+ Statement deleteMapKey;
+ Clause inKey = QueryBuilder.in("key", mapKeys);
+ deleteMapKey = QueryBuilder.delete().from(MAP_KEYS_TABLE)
+ .where(inKey);
+ session.execute(deleteMapKey);
+
+
}
@Override
public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
- final MultiTenantColumnFamilyDefinition mapEntries =
- new MultiTenantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
- BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ // This here only until all traces of Astyanax are removed.
+ return Collections.emptyList();
- final MultiTenantColumnFamilyDefinition mapKeys =
- new MultiTenantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
- UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ }
+
+
+ @Override
+ public Collection<TableDefinition> getTables() {
+
+ final TableDefinition mapEntries =
+ new TableDefinition( MAP_ENTRIES_TABLE, MAP_ENTRIES_PARTITION_KEYS, MAP_ENTRIES_COLUMN_KEYS,
+ MAP_ENTRIES_COLUMNS, TableDefinition.CacheOption.KEYS, MAP_ENTRIES_CLUSTERING_ORDER);
+
+ final TableDefinition mapKeys =
+ new TableDefinition( MAP_KEYS_TABLE, MAP_KEYS_PARTITION_KEYS, MAP_KEYS_COLUMN_KEYS,
+ MAP_KEYS_COLUMNS, TableDefinition.CacheOption.KEYS, MAP_KEYS_CLUSTERING_ORDER);
+
+
return Arrays.asList( mapEntries, mapKeys );
+
}
- private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
+ private ByteBuffer getValueCQL( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+ Clause in = QueryBuilder.in("key", getMapEntryPartitionKey(scope, key) );
+ Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE)
+ .where(in)
+ .setConsistencyLevel(consistencyLevel);
- //now get all columns, including the "old row key value"
- try {
- final Column<Boolean> result =
- keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+ ResultSet resultSet = session.execute(statement);
+ com.datastax.driver.core.Row row = resultSet.one();
- return result;
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
+ return row != null ? row.getBytes("value") : null;
}
- /**
- * Get multiple values, using the string builder
- */
- private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
- final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+ private <T> T getValuesCQL( final MapScope scope, final Collection<String> keys, final ResultsBuilderCQL<T> builder ) {
- for ( final String key : keys ) {
- //add it to the entry
- final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+ final List<ByteBuffer> serializedKeys = new ArrayList<>();
- rowKeys.add( entryRowKey );
- }
+ keys.forEach(key -> serializedKeys.add(getMapEntryPartitionKey(scope,key)));
+
+ Clause in = QueryBuilder.in("key", serializedKeys );
+ Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE)
+ .where(in);
- //now get all columns, including the "old row key value"
- try {
- final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
- keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice(
- rowKeys ).withColumnSlice( true ).execute()
- .getResult();
+ ResultSet resultSet = session.execute(statement);
-
- return builder.buildResults( rows );
- }
- catch ( NotFoundException nfe ) {
- //nothing to return
- return null;
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
+ return builder.buildResultsCQL( resultSet );
}
- private void executeBatch( MutationBatch batch ) {
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
- }
-
-
- /**
- * Inner class to serialize and edgeIdTypeKey
- */
- private static class MapKeySerializer implements CompositeFieldSerializer<String> {
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final String key ) {
- builder.addString( key );
- }
-
-
- @Override
- public String fromComposite( final CompositeParser composite ) {
- final String key = composite.readString();
-
- return key;
- }
- }
-
-
- /**
- * Inner class to serialize and edgeIdTypeKey
- */
- private static class MapEntrySerializer implements CompositeFieldSerializer<MapEntryKey> {
-
- @Override
- public void toComposite( final CompositeBuilder builder, final MapEntryKey key ) {
-
- builder.addString( key.mapName );
- builder.addString( key.key );
- }
-
-
- @Override
- public MapEntryKey fromComposite( final CompositeParser composite ) {
-
- final String mapName = composite.readString();
-
- final String entryKey = composite.readString();
-
- return new MapEntryKey( mapName, entryKey );
- }
- }
-
-
- /**
- * Entries for serializing map entries and keys to a row
- */
- private static class MapEntryKey {
- public final String mapName;
- public final String key;
-
-
- private MapEntryKey( final String mapName, final String key ) {
- this.mapName = mapName;
- this.key = key;
- }
-
-
- /**
- * Create a scoped row key from the key
- */
- public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
-
- return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
- }
- }
-
/**
* Build the results from the row keys
*/
- private static interface ResultsBuilder<T> {
- public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
+ private interface ResultsBuilderCQL<T> {
+
+ T buildResultsCQL( final ResultSet resultSet );
}
- public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
+ public static class StringResultsBuilderCQL implements ResultsBuilderCQL<Map<String, String>> {
@Override
- public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
- final int size = rows.size();
+ public Map<String, String> buildResultsCQL( final ResultSet resultSet ) {
- final Map<String, String> results = new HashMap<>( size );
- for ( int i = 0; i < size; i++ ) {
+ final Map<String, String> results = new HashMap<>();
- final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+ resultSet.all().forEach( row -> {
- final String value = row.getColumns().getStringValue( true, null );
+ @SuppressWarnings("unchecked")
+ List<Object> keys = (List) deserializeMapEntryKey(row.getBytes("key"));
+ String value = (String)DataType.text().deserialize( row.getBytes("value"),
+ ProtocolVersion.NEWEST_SUPPORTED );
- if ( value == null ) {
- continue;
- }
+ // the actual string key value is the last element
+ results.put((String)keys.get(keys.size() -1), value);
- results.put( row.getKey().getKey().key, value );
- }
+ });
return results;
}
}
+
+ private static Object deserializeMapEntryKey(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(stuff.size() == 0){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ }
+
+ return stuff;
+
+ }
+
+ public static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, String mapName, String mapKey,
+ int bucketNumber ){
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(0, ownerUUID);
+ keys.add(1, ownerType);
+ keys.add(2, mapName);
+ keys.add(3, mapKey);
+
+ if( bucketNumber > 0){
+ keys.add(4, bucketNumber);
+ }
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16+ownerType.length()+mapName.length()+mapKey.length();
+ if(bucketNumber > 0 ){
+ // ints are 4 bytes
+ size += 4;
+ }
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+
+ private ByteBuffer getMapEntryPartitionKey(MapScope scope, String key){
+
+ return serializeKeys(scope.getApplication().getUuid(),
+ scope.getApplication().getType(), scope.getName(), key, -1);
+
+ }
+
+ private ByteBuffer getMapKeyPartitionKey(MapScope scope, String key, int bucketNumber){
+
+ return serializeKeys(scope.getApplication().getUuid(),
+ scope.getApplication().getType(), scope.getName(), key, bucketNumber);
+
+ }
}
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index 41286ab..2a68247 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -82,6 +82,22 @@
assertEquals( value, returned );
}
+ @Test
+ public void writeReadStringWithLongKey() {
+ MapManager mm = mmf.createMapManager( this.scope );
+
+ final String key = "key1234567890123456789012345678901234567890123456789012345678901234567890" +
+ "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" +
+ "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
+ final String value = "value";
+
+ mm.putString( key, value );
+
+ final String returned = mm.getString( key );
+
+ assertEquals( value, returned );
+ }
+
@Test
public void multiReadNoKey() {
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 8708444..fe976bc 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -80,7 +80,8 @@
<commons.collections.version>3.2.1</commons.collections.version>
<commons.io.version>2.4</commons.io.version>
<commons.lang.version>3.1</commons.lang.version>
- <elasticsearch.version>1.7.3</elasticsearch.version>
+ <datastax.version>2.1.9</datastax.version>
+ <elasticsearch.version>1.4.4</elasticsearch.version>
<fasterxml-uuid.version>3.1.3</fasterxml-uuid.version>
<guava.version>18.0</guava.version>
<guice.version>4.0-beta5</guice.version>
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 3ee5f6f..e64db83 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -29,7 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.model.field.*;
import org.junit.Before;
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index eb6651f..3d68fe1 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -25,7 +25,7 @@
import java.util.*;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.junit.Before;
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index a6cef60..afaebb7 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -24,7 +24,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.StressTest;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.index.*;
import org.junit.After;
import org.junit.Before;
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/TestIndexIdentifier.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/TestIndexIdentifier.java
index 2cc7e62..ffd25f3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/TestIndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/TestIndexIdentifier.java
@@ -20,9 +20,8 @@
package org.apache.usergrid.persistence.index.impl;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.StringUtils;
import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
diff --git a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
index ed3209c..7b869af 100644
--- a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
@@ -1,12 +1,10 @@
# The properties are not the actual configuration properties but
# safe dynamic property defaults for our testing via IDE or Maven
-cassandra.connections=10
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
index.query.limit.default=10
elasticsearch.indexname=QueryIndexTests
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
index e0e71c9..ff4f3bb 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
@@ -1,14 +1,12 @@
# These are for CHOP environment settings
-cassandra.connections=20
+cassandra.connections=50
cassandra.port=9160
-cassandra.version=1.2
# a comma delimited private IP address list to your chop cassandra cluster
# define this in your settings.xml and have it as an always active profile
cassandra.hosts=${chop.cassandra.hosts}
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
-cassandra.timeout=5000
index.query.limit.default=10
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
index ccff66a..3c853ac 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
@@ -3,10 +3,8 @@
cassandra.embedded=true
cassandra.hosts=127.0.0.1
cassandra.port=9160
-cassandra.version=1.2
cassandra.cluster_name=Usergrid
-cassandra.connections=20
-cassandra.timeout=5000
+cassandra.connections=50
collections.keyspace=Usergrid_Collections
collections.keyspace.strategy.options=replication_factor:1
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 0be5bd0..cca9c44 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -32,7 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.queue.Queue;
diff --git a/stack/pom.xml b/stack/pom.xml
index 3d14ad7..5186a13 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -465,6 +465,12 @@
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
+
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>*</artifactId>
+
+ </exclusion>
</exclusions>
</dependency>
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index d845fcc..4b4b165 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -16,8 +16,7 @@
# REST module test properties
cassandra.startup=external
-cassandra.timeout=2000
-cassandra.connections=800
+cassandra.connections=50
elasticsearch.startup=external
diff --git a/stack/services/src/test/resources/usergrid-custom-test.properties b/stack/services/src/test/resources/usergrid-custom-test.properties
index 9a800c9..7524b1c 100644
--- a/stack/services/src/test/resources/usergrid-custom-test.properties
+++ b/stack/services/src/test/resources/usergrid-custom-test.properties
@@ -16,8 +16,7 @@
# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
cassandra.startup=external
elasticsearch.startup=external
-cassandra.timeout=2000
-cassandra.connections=1000
+cassandra.connections=50
#Poll interval to check for new jobs in millseconds. 10 milliseconds for testing
usergrid.scheduler.job.interval=100
diff --git a/stack/test-utils/pom.xml b/stack/test-utils/pom.xml
index 84b633d..5f1b08d 100644
--- a/stack/test-utils/pom.xml
+++ b/stack/test-utils/pom.xml
@@ -101,6 +101,10 @@
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
index 68f366b..40cedcd 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
@@ -20,9 +20,9 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import com.datastax.driver.core.ConsistencyLevel;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.util.RangeBuilder;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
@@ -165,7 +165,7 @@
//do stuff w/o read repair
UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load(
new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ),
- ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")), entityType,
+ ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "LOCAL_QUORUM")), entityType,
Collections.singletonList(new StringField( fieldType, entityName) ), false);
StringBuilder stringBuilder = new StringBuilder();
@@ -199,7 +199,7 @@
try {
rows = keyspace.prepareQuery(CF_UNIQUE_VALUES)
- .setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
+ .setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")))
.getAllRows()
.withColumnRange(new RangeBuilder().setLimit(1000).build())
.execute().getResult().iterator();