blob: 45c97842e4a97ff0ebd35121c36335d0ef65c0e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.persistence.cassandra;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.utils.JsonUtils;
import com.fasterxml.jackson.databind.JsonNode;
import me.prettyprint.cassandra.service.ThriftColumnDef;
import me.prettyprint.hector.api.ClockResolution;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.ColumnDefinition;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;
import static java.nio.ByteBuffer.wrap;
import static me.prettyprint.hector.api.factory.HFactory.createClockResolution;
import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static org.apache.commons.beanutils.MethodUtils.invokeStaticMethod;
import static org.apache.commons.lang.StringUtils.removeEnd;
import static org.apache.commons.lang.StringUtils.removeStart;
import static org.apache.commons.lang.StringUtils.split;
import static org.apache.commons.lang.StringUtils.substringAfterLast;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
import static org.apache.usergrid.persistence.Schema.serializeEntityProperty;
import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import static org.apache.usergrid.utils.ClassUtils.isBasicType;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
import static org.apache.usergrid.utils.StringUtils.replaceAll;
import static org.apache.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
/** @author edanuff */
public class CassandraPersistenceUtils {
private static final Logger logger = LoggerFactory.getLogger( CassandraPersistenceUtils.class );
/** Logger for batch operations */
private static final Logger batch_logger =
LoggerFactory.getLogger( CassandraPersistenceUtils.class.getPackage().getName() + ".BATCH" );
/**
*
*/
public static final ByteBuffer PROPERTY_TYPE_AS_BYTES = bytebuffer( PROPERTY_TYPE );
/**
*
*/
public static final ByteBuffer PROPERTY_ID_AS_BYTES = bytebuffer( PROPERTY_UUID );
/**
*
*/
public static final char KEY_DELIM = ':';
/**
*
*/
public static final UUID NULL_ID = new UUID( 0, 0 );
/**
* @param operation
* @param columnFamily
* @param key
* @param columnName
* @param columnValue
* @param timestamp
*/
public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
Object columnValue, long timestamp ) {
if ( batch_logger.isDebugEnabled() ) {
batch_logger.debug( "{} cf={} key={} name={} value={}",
new Object[] { operation, columnFamily, key, columnName, columnValue } );
}
}
public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
Object columnValue, long timestamp ) {
logBatchOperation( "Insert", columnFamily, key, columnName, columnValue, timestamp );
if ( columnName instanceof List<?> ) {
columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
}
if ( columnValue instanceof List<?> ) {
columnValue = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
}
HColumn<ByteBuffer, ByteBuffer> column =
createColumn( bytebuffer( columnName ), bytebuffer( columnValue ), timestamp, be, be );
m.addInsertion( bytebuffer( key ), columnFamily.toString(), column );
}
public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Map<?, ?> columns,
long timestamp ) throws Exception {
for ( Entry<?, ?> entry : columns.entrySet() ) {
addInsertToMutator( m, columnFamily, key, entry.getKey(), entry.getValue(), timestamp );
}
}
public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType, String propertyName,
Object propertyValue, long timestamp ) {
logBatchOperation( "Insert", ApplicationCF.ENTITY_PROPERTIES, key, propertyName, propertyValue, timestamp );
HColumn<ByteBuffer, ByteBuffer> column = createColumn( bytebuffer( propertyName ),
serializeEntityProperty( entityType, propertyName, propertyValue ), timestamp, be, be );
m.addInsertion( bytebuffer( key ), ApplicationCF.ENTITY_PROPERTIES.toString(), column );
}
public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType,
Map<String, ?> columns, long timestamp ) throws Exception {
for ( Entry<String, ?> entry : columns.entrySet() ) {
addPropertyToMutator( m, key, entityType, entry.getKey(), entry.getValue(), timestamp );
}
}
/** Delete the row */
public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp )
throws Exception {
logBatchOperation( "Delete", columnFamily, key, null, null, timestamp );
m.addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp );
}
public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
long timestamp ) throws Exception {
logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
if ( columnName instanceof List<?> ) {
columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
}
m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
}
public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp,
Object... columnNames ) throws Exception {
for ( Object columnName : columnNames ) {
logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
if ( columnName instanceof List<?> ) {
columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
}
m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
}
}
public static Map<String, ByteBuffer> getColumnMap( List<HColumn<String, ByteBuffer>> columns ) {
Map<String, ByteBuffer> column_map = new TreeMap<String, ByteBuffer>( String.CASE_INSENSITIVE_ORDER );
if ( columns != null ) {
for ( HColumn<String, ByteBuffer> column : columns ) {
String column_name = column.getName();
column_map.put( column_name, column.getValue() );
}
}
return column_map;
}
public static <K, V> Map<K, V> asMap( List<HColumn<K, V>> columns ) {
if ( columns == null ) {
return null;
}
Map<K, V> column_map = new LinkedHashMap<K, V>();
for ( HColumn<K, V> column : columns ) {
K column_name = column.getName();
column_map.put( column_name, column.getValue() );
}
return column_map;
}
public static List<ByteBuffer> getAsByteKeys( List<UUID> ids ) {
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
for ( UUID id : ids ) {
keys.add( bytebuffer( key( id ) ) );
}
return keys;
}
/** @return timestamp value for current time */
public static long createTimestamp() {
return createClockResolution( ClockResolution.MICROSECONDS ).createClock();
}
/** @return normalized group path */
public static String normalizeGroupPath( String path ) {
path = replaceAll( path.toLowerCase().trim(), "//", "/" );
path = removeStart( path, "/" );
path = removeEnd( path, "/" );
return path;
}
/** @return a composite key */
public static Object key( Object... objects ) {
if ( objects.length == 1 ) {
Object obj = objects[0];
if ( ( obj instanceof UUID ) || ( obj instanceof ByteBuffer ) ) {
return obj;
}
}
StringBuilder s = new StringBuilder();
for ( Object obj : objects ) {
if ( obj instanceof String ) {
s.append( ( ( String ) obj ).toLowerCase() );
}
else if ( obj instanceof List<?> ) {
s.append( key( ( ( List<?> ) obj ).toArray() ) );
}
else if ( obj instanceof Object[] ) {
s.append( key( ( Object[] ) obj ) );
}
else if ( obj != null ) {
s.append( obj );
}
else {
s.append( "*" );
}
s.append( KEY_DELIM );
}
s.deleteCharAt( s.length() - 1 );
return s.toString();
}
/** @return UUID for composite key */
public static UUID keyID( Object... objects ) {
if ( objects.length == 1 ) {
Object obj = objects[0];
if ( obj instanceof UUID ) {
return ( UUID ) obj;
}
}
String keyStr = key( objects ).toString();
if ( keyStr.length() == 0 ) {
return NULL_ID;
}
UUID uuid = UUID.nameUUIDFromBytes( keyStr.getBytes() ); //UUIDUtils.newTimeUUID(); //UUID.nameUUIDFromBytes( keyStr.getBytes() );
logger.debug( "Key {} equals UUID {}", keyStr, uuid );
return uuid;
}
/** @return UUID for entity alias */
public static UUID aliasID( UUID ownerId, String aliasType, String alias ) {
return keyID( ownerId, aliasType, alias );
}
public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> batch, UUID targetId,
String columnFamily, String keyPrefix, String keySuffix,
List<UUID> keyIds, long timestamp ) throws Exception {
for ( UUID keyId : keyIds ) {
ByteBuffer key = null;
if ( ( StringUtils.isNotEmpty( keyPrefix ) ) || ( StringUtils.isNotEmpty( keySuffix ) ) ) {
key = bytebuffer( keyPrefix + keyId.toString() + keySuffix );
}
else {
key = bytebuffer( keyId );
}
addInsertToMutator( batch, columnFamily, key, targetId, ByteBuffer.allocate( 0 ), timestamp );
}
return batch;
}
public static MutationResult batchExecute( Mutator<?> m, int retries ) {
return m.execute();
}
public static Object toStorableValue( Object obj ) {
if ( obj == null ) {
return null;
}
if ( isBasicType( obj.getClass() ) ) {
return obj;
}
if ( obj instanceof ByteBuffer ) {
return obj;
}
JsonNode json = toJsonNode( obj );
if ( ( json != null ) && json.isValueNode() ) {
if ( json.isBigInteger() ) {
return json.asInt();
}
else if ( json.isNumber() || json.isBoolean() ) {
return BigInteger.valueOf( json.asLong() );
}
else if ( json.isTextual() ) {
return json.asText();
}
else if ( json.isBinary() ) {
try {
return wrap( json.binaryValue() );
}
catch ( IOException e ) {
}
}
}
return json;
}
public static ByteBuffer toStorableBinaryValue( Object obj ) {
obj = toStorableValue( obj );
if ( obj instanceof JsonNode ) {
return JsonUtils.toByteBuffer( obj );
}
else {
return bytebuffer( obj );
}
}
public static ByteBuffer toStorableBinaryValue( Object obj, boolean forceJson ) {
obj = toStorableValue( obj );
if ( ( obj instanceof JsonNode ) || ( forceJson && ( obj != null ) && !( obj instanceof ByteBuffer ) ) ) {
return JsonUtils.toByteBuffer( obj );
}
else {
return bytebuffer( obj );
}
}
public static List<ColumnDefinition> getIndexMetadata( String indexes ) {
if ( indexes == null ) {
return null;
}
String[] index_entries = split( indexes, ',' );
List<ColumnDef> columns = new ArrayList<ColumnDef>();
for ( String index_entry : index_entries ) {
String column_name = stringOrSubstringBeforeFirst( index_entry, ':' ).trim();
String comparer = substringAfterLast( index_entry, ":" ).trim();
if ( StringUtils.isBlank( comparer ) ) {
comparer = "UUIDType";
}
if ( StringUtils.isNotBlank( column_name ) ) {
ColumnDef cd = new ColumnDef( bytebuffer( column_name ), comparer );
cd.setIndex_name( column_name );
cd.setIndex_type( IndexType.KEYS );
columns.add( cd );
}
}
return ThriftColumnDef.fromThriftList( columns );
}
public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum, String keyspace ) {
return getCfDefs( cfEnum, null, keyspace );
}
public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum,
List<ColumnFamilyDefinition> cf_defs, String keyspace ) {
if ( cf_defs == null ) {
cf_defs = new ArrayList<ColumnFamilyDefinition>();
}
CFEnum[] values = null;
try {
values = ( CFEnum[] ) invokeStaticMethod( cfEnum, "values", null);
}
catch ( Exception e ) {
logger.error( "Couldn't get CFEnum values", e );
}
if ( values == null ) {
return null;
}
for ( CFEnum cf : values ) {
if ( !cf.create() ) {
continue;
}
String defaultValidationClass = cf.getValidator();
List<ColumnDefinition> metadata = cf.getMetadata();
ColumnFamilyDefinition cf_def = HFactory.createColumnFamilyDefinition( keyspace, cf.getColumnFamily(),
ComparatorType.getByClassName( cf.getComparator() ), metadata );
if ( defaultValidationClass != null ) {
cf_def.setDefaultValidationClass( defaultValidationClass );
}
cf_defs.add( cf_def );
}
return cf_defs;
}
public static void validateKeyspace( CFEnum[] cf_enums, KeyspaceDefinition ksDef ) {
Map<String, ColumnFamilyDefinition> cfs = new HashMap<String, ColumnFamilyDefinition>();
for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
cfs.put( cf.getName(), cf );
}
for ( CFEnum c : cf_enums ) {
if ( !cfs.keySet().contains( c.getColumnFamily() ) ) {
}
}
}
}