blob: 88603ac91153adff5d49898cb001375c60b05ae2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.usergrid.persistence.cassandra;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.entities.Event;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.count.Batcher;
import org.apache.usergrid.count.common.Count;
import me.prettyprint.cassandra.serializers.PrefixedSerializer;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.mutation.Mutator;
import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createCounterColumn;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_COUNTERS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.persistence.cassandra.Serializers.*;
import org.apache.usergrid.persistence.index.query.CounterResolution;
public class CounterUtils {
public static final Logger logger = LoggerFactory.getLogger( CounterUtils.class );
private String counterType = "o";
private Batcher batcher;
public void setBatcher( Batcher batcher ) {
this.batcher = batcher;
/** Set the type to 'new' ("n"), 'parallel' ("p"), 'old' ("o" - the default) If not one of the above, do nothing */
public void setCounterType( String counterType ) {
if ( counterType == null ) {
if ( "n".equals( counterType ) || "p".equals( counterType ) || "o".equals( counterType ) ) {
this.counterType = counterType;
public boolean getIsCounterBatched() {
return "n".equals( counterType );
public static class AggregateCounterSelection {
public static final String COLON = ":";
public static final String STAR = "*";
String name;
UUID userId;
UUID groupId;
UUID queueId;
String category;
public AggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId, String category ) { = name.toLowerCase();
this.userId = userId;
this.groupId = groupId;
this.queueId = queueId;
this.category = category;
public void apply( String name, UUID userId, UUID groupId, UUID queueId, String category ) { = name.toLowerCase();
this.userId = userId;
this.groupId = groupId;
this.queueId = queueId;
this.category = category;
public String getName() {
return name;
public void setName( String name ) { = name;
public UUID getUserId() {
return userId;
public void setUserId( UUID userId ) {
this.userId = userId;
public UUID getGroupId() {
return groupId;
public void setGroupId( UUID groupId ) {
this.groupId = groupId;
public UUID getQueueId() {
return queueId;
public void setQueueId( UUID queueId ) {
this.queueId = queueId;
public String getCategory() {
return category;
public void setCategory( String category ) {
this.category = category;
public String getRow( CounterResolution resolution ) {
return rowBuilder( name, userId, groupId, queueId, category, resolution );
public static String rowBuilder( String name, UUID userId, UUID groupId, UUID queueId, String category,
CounterResolution resolution ) {
StringBuilder builder = new StringBuilder( name );
builder.append( COLON ).append( ( userId != null ? userId.toString() : STAR ) ).append( COLON )
.append( groupId != null ? groupId.toString() : STAR ).append( COLON )
.append( ( queueId != null ? queueId.toString() : STAR ) ).append( COLON )
.append( ( category != null ? category : STAR ) ).append( COLON ).append( );
return builder.toString();
public void addEventCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, Event event, long timestamp ) {
if ( event.getCounters() != null ) {
for ( Entry<String, Integer> value : event.getCounters().entrySet() ) {
batchIncrementAggregateCounters( m, applicationId, event.getUser(), event.getGroup(), null,
event.getCategory(), value.getKey().toLowerCase(), value.getValue(), event.getTimestamp(),
timestamp );
public void addMessageCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, UUID queueId, Message msg,
long timestamp ) {
if ( msg.getCounters() != null ) {
for ( Entry<String, Integer> value : msg.getCounters().entrySet() ) {
batchIncrementAggregateCounters( m, applicationId, null, null, queueId, msg.getCategory(),
value.getKey().toLowerCase(), value.getValue(), msg.getTimestamp(), timestamp );
public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
UUID queueId, String category, Map<String, Long> counters,
long timestamp ) {
if ( counters != null ) {
for ( Entry<String, Long> value : counters.entrySet() ) {
batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category,
value.getKey().toLowerCase(), value.getValue(), timestamp );
public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
UUID queueId, String category, String name, long value,
long cassandraTimestamp ) {
batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category, name, value,
cassandraTimestamp / 1000, cassandraTimestamp );
public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
UUID queueId, String category, String name, long value,
long counterTimestamp, long cassandraTimestamp ) {
for ( CounterResolution resolution : CounterResolution.values() ) {
logger.debug( "BIAC for resolution {}", resolution );
batchIncrementAggregateCounters( m, userId, groupId, queueId, category, resolution, name, value,
counterTimestamp, applicationId );
logger.debug( "DONE BIAC for resolution {}", resolution );
batchIncrementEntityCounter( m, applicationId, name, value, cassandraTimestamp, applicationId );
if ( userId != null ) {
batchIncrementEntityCounter( m, userId, name, value, cassandraTimestamp, applicationId );
if ( groupId != null ) {
batchIncrementEntityCounter( m, groupId, name, value, cassandraTimestamp, applicationId );
private void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID userId, UUID groupId, UUID queueId,
String category, CounterResolution resolution, String name,
long value, long counterTimestamp, UUID applicationId ) {
String[] segments = StringUtils.split( name, '.' );
for ( int j = 0; j < segments.length; j++ ) {
name = StringUtils.join( segments, '.', 0, j + 1 );
// skip system counter
if ( "system".equals( name ) ) {
// *:*:*:*
handleAggregateCounterRow( m,
AggregateCounterSelection.rowBuilder( name, null, null, null, null, resolution ),
resolution.round( counterTimestamp ), value, applicationId );
String currentRow = null;
HashSet<String> rowSet = new HashSet<String>( 16 );
for ( int i = 0; i < 16; i++ ) {
boolean include_user = ( i & 0x01 ) != 0;
boolean include_group = ( i & 0x02 ) != 0;
boolean include_queue = ( i & 0x04 ) != 0;
boolean include_category = ( i & 0x08 ) != 0;
Object[] parameters = {
include_user ? userId : null, include_group ? groupId : null, include_queue ? queueId : null,
include_category ? category : null
int non_null = 0;
for ( Object p : parameters ) {
if ( p != null ) {
currentRow = AggregateCounterSelection
.rowBuilder( name, ( UUID ) parameters[0], ( UUID ) parameters[1], ( UUID ) parameters[2],
( String ) parameters[3], resolution );
if ( non_null > 0 && !rowSet.contains( currentRow ) ) {
rowSet.add( currentRow );
handleAggregateCounterRow( m, currentRow, resolution.round( counterTimestamp ), value,
applicationId );
private void handleAggregateCounterRow( Mutator<ByteBuffer> m, String key, long column, long value,
UUID applicationId ) {
if ( logger.isDebugEnabled() ) { "HACR: aggregateRow for app {} with key {} column {} and value {}",
new Object[] { applicationId, key, column, value } );
if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
if ( m != null ) {
HCounterColumn<Long> c = createCounterColumn( column, value, le );
m.addCounter( bytebuffer( key ), APPLICATION_AGGREGATE_COUNTERS.toString(), c );
if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
// create and add Count
PrefixedSerializer ps =
new PrefixedSerializer( applicationId, ue, se );
new Count( APPLICATION_AGGREGATE_COUNTERS.toString(), ps.toByteBuffer( key ), column, value ) );
public AggregateCounterSelection getAggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId,
String category ) {
return new AggregateCounterSelection( name, userId, groupId, queueId, category );
public String getAggregateCounterRow( String name, UUID userId, UUID groupId, UUID queueId, String category,
CounterResolution resolution ) {
return AggregateCounterSelection.rowBuilder( name, userId, groupId, queueId, category, resolution );
public List<String> getAggregateCounterRows( List<AggregateCounterSelection> selections,
CounterResolution resolution ) {
List<String> keys = new ArrayList<String>();
for ( AggregateCounterSelection selection : selections ) {
keys.add( selection.getRow( resolution ) );
return keys;
private Mutator<ByteBuffer> batchIncrementEntityCounter( Mutator<ByteBuffer> m, UUID entityId, String name,
Long value, long timestamp, UUID applicationId ) {
if ( logger.isDebugEnabled() ) {
logger.debug( "BIEC: Incrementing property {} of entity {} by value {}",
new Object[] { name, entityId, value } );
addInsertToMutator( m, ENTITY_DICTIONARIES, key( entityId, DICTIONARY_COUNTERS ), name, null, timestamp );
if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
HCounterColumn<String> c = createCounterColumn( name, value );
m.addCounter( bytebuffer( entityId ), ENTITY_COUNTERS.toString(), c );
if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
PrefixedSerializer ps = new PrefixedSerializer( applicationId, ue, ue );
batcher.add( new Count( ENTITY_COUNTERS.toString(), ps.toByteBuffer( entityId ), name, value ) );
return m;
public Mutator<ByteBuffer> batchIncrementQueueCounter( Mutator<ByteBuffer> m, UUID queueId, String name, long value,
long timestamp, UUID applicationId ) {
if ( logger.isDebugEnabled() ) {
logger.debug( "BIQC: Incrementing property {} of queue {} by value {}",
new Object[] { name, queueId, value } );
m.addInsertion( bytebuffer( key( queueId, DICTIONARY_COUNTERS ).toString() ),
createColumn( name, ByteBuffer.allocate( 0 ), timestamp, se, be ) );
if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
HCounterColumn<String> c = createCounterColumn( name, value );
ByteBuffer keybytes = bytebuffer( queueId );
m.addCounter( keybytes, QueuesCF.COUNTERS.toString(), c );
if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
PrefixedSerializer ps = new PrefixedSerializer( applicationId, ue, ue );
batcher.add( new Count( QueuesCF.COUNTERS.toString(), ps.toByteBuffer( queueId ), name, value ) );
return m;
public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, UUID queueId,
Map<String, Long> values, long timestamp,
UUID applicationId ) {
for ( Entry<String, Long> entry : values.entrySet() ) {
batchIncrementQueueCounter( m, queueId, entry.getKey(), entry.getValue(), timestamp, applicationId );
return m;
public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, Map<UUID, Map<String, Long>> values,
long timestamp, UUID applicationId ) {
for ( Entry<UUID, Map<String, Long>> entry : values.entrySet() ) {
batchIncrementQueueCounters( m, entry.getKey(), entry.getValue(), timestamp, applicationId );
return m;