blob: de967815ab82e8af8b54d57418f068def3de4e9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.apps.logstream;
import java.util.*;
import java.util.Map.Entry;
import javax.validation.ValidationException;
import javax.validation.constraints.NotNull;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.codec.KryoSerializableStreamCodec;
import org.apache.apex.malhar.lib.logs.DimensionObject;
import org.apache.commons.lang.mutable.MutableDouble;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Partitioner.PartitioningContext;
import com.datatorrent.apps.logstream.LogstreamUtil.AggregateOperation;
import com.datatorrent.apps.logstream.PropertyRegistry.LogstreamPropertyRegistry;
import com.datatorrent.apps.logstream.PropertyRegistry.PropertyRegistry;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.netlet.util.DTThrowable;
/**
* Partitionable dimension operator.
* Computes specified operators on specified values for specified dimension combinations.
* Each partition works on a filtered tuple of a log type.
*
* @since 0.9.4
*/
public class DimensionOperator extends BaseOperator implements Partitioner<DimensionOperator>
{
/**
* key: timebucket|timestamp|recordtype|filter|dimensionId|value.operationType
* value: DimensionObject
*/
public final transient DefaultOutputPort<Map<String, DimensionObject<String>>> aggregationsOutput = new DefaultOutputPort<Map<String, DimensionObject<String>>>()
{
@Override
public Unifier<Map<String, DimensionObject<String>>> getUnifier()
{
DimensionOperatorUnifier unifier = new DimensionOperatorUnifier();
return unifier;
}
};
public final transient DefaultInputPort<Map<String, Object>> in = new DefaultInputPort<Map<String, Object>>()
{
@Override
public void process(Map<String, Object> tuple)
{
processTuple(tuple);
}
@Override
public StreamCodec<Map<String, Object>> getStreamCodec()
{
return new DimensionOperatorStreamCodec();
}
};
@NotNull
private PropertyRegistry<String> registry;
private static final Logger logger = LoggerFactory.getLogger(DimensionOperator.class);
private String timeKeyName;
private long windowWidth = 500;
private long currentWindowId;
private transient TimeZone timeZone = TimeZone.getTimeZone("GMT");
private transient Calendar calendar = new GregorianCalendar(timeZone);
private int timeBucketFlags;
private final Map<String, Map<String, Map<AggregateOperation, Number>>> cacheObject = new HashMap<String, Map<String, Map<AggregateOperation, Number>>>();
private final HashMap<String, Number> recordType = new HashMap<String, Number>();
private HashMap<Integer, HashMap<String, HashSet<AggregateOperation>>> valueOperations = new HashMap<Integer, HashMap<String, HashSet<AggregateOperation>>>();
private HashMap<Integer, ArrayList<Integer>> dimensionCombinationList = new HashMap<Integer, ArrayList<Integer>>();
private transient boolean firstTuple = true;
private ArrayList<Integer> dimensionCombinations;
private HashMap<String, HashSet<AggregateOperation>> valueOperationTypes;
List<String> outTimeBuckets; // represents the list of timebuckets to be emitted when the time bucket changes
@Override
public void setup(OperatorContext context)
{
super.setup(context);
if (context != null) {
windowWidth = context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
}
LogstreamPropertyRegistry.setInstance(registry);
}
/**
* Does dimensional computations for each incoming tuple and populates the cache with the computations
*
* @param tuple
*/
protected void processTuple(Map<String, Object> tuple)
{
long time;
if (timeKeyName != null) {
time = (Long)tuple.get(timeKeyName);
}
else {
time = LogstreamUtil.extractTime(currentWindowId, windowWidth);
}
List<String> timeBucketList = getTimeBucketList(time);
if (firstTuple) {
// populate record type
extractType(tuple);
outTimeBuckets = new ArrayList<String>(timeBucketList);
// create all dimension combinations if not specified by user
if (!dimensionCombinationList.containsKey(recordType.get(LogstreamUtil.LOG_TYPE))) {
createAllDimensionCombinations();
}
dimensionCombinations = dimensionCombinationList.get(recordType.get(LogstreamUtil.LOG_TYPE));
valueOperationTypes = valueOperations.get(recordType.get(LogstreamUtil.LOG_TYPE));
firstTuple = false;
}
// temporary validation to ensure that unexpected records do not appear in any partition
Number receivedLogType = (Number)tuple.get(LogstreamUtil.LOG_TYPE);
Number receivedFilter = (Number)tuple.get(LogstreamUtil.FILTER);
Number expectedLogType = recordType.get(LogstreamUtil.LOG_TYPE);
Number expectedFilter = recordType.get(LogstreamUtil.FILTER);
if (!receivedLogType.equals(expectedLogType) || !receivedFilter.equals(expectedFilter)) {
logger.error("Unexpected tuple");
logger.error("expected log type = {} received = {}", expectedLogType, receivedLogType);
logger.error("expected filter = {} received = {}", expectedFilter, receivedFilter);
}
else {
for (String timeBucket : timeBucketList) {
for (Integer dimensionCombinationId : dimensionCombinations) {
String dimensionCombination = registry.lookupValue(dimensionCombinationId);
String[] dimensions = dimensionCombination.split(":");
String dimValueName = new String();
boolean isBadTuple = false;
if (dimensions != null) {
for (String dimension : dimensions) {
Object dimVal = tuple.get(dimension);
if (dimVal == null) {
logger.error("dimension \"{}\" not found in tuple", dimension);
isBadTuple = true;
continue;
}
if (!dimValueName.isEmpty()) {
dimValueName += ",";
}
dimValueName += tuple.get(dimension).toString();
}
}
if (!isBadTuple) {
for (Entry<String, HashSet<AggregateOperation>> entry : valueOperationTypes.entrySet()) {
String valueKeyName = entry.getKey();
Object value = tuple.get(valueKeyName);
Number numberValue = LogstreamUtil.extractNumber(value);
doComputations(timeBucket, dimensionCombinationId, dimValueName, valueKeyName, numberValue);
}
}
}
}
}
}
/**
* returns list of time buckets for the given timestamp
*
* @param time
* @return
*/
protected List<String> getTimeBucketList(long time)
{
calendar.setTimeInMillis(time);
List<String> timeBucketList = new ArrayList<String>();
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_YEAR) != 0) {
timeBucketList.add(String.format("Y|%04d", calendar.get(Calendar.YEAR)));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_MONTH) != 0) {
timeBucketList.add(String.format("M|%04d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_WEEK) != 0) {
timeBucketList.add(String.format("W|%04d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.WEEK_OF_YEAR)));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_DAY) != 0) {
timeBucketList.add(String.format("D|%04d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH)));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_HOUR) != 0) {
timeBucketList.add(String.format("h|%04d%02d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY)));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_MINUTE) != 0) {
timeBucketList.add(String.format("m|%04d%02d%02d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE)));
}
if ((timeBucketFlags & LogstreamUtil.TIMEBUCKET_SECOND) != 0) {
timeBucketList.add(String.format("s|%04d%02d%02d%02d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND)));
}
return timeBucketList;
}
/**
* Does computations for the given dimension and its value names on the given value key name
*
* @param timeBucket time bucket
* @param dimensionCombinationId id of dimension combination
* @param dimValueName values of the dimension combination
* @param valueKeyName name of the value key on which operations are performed
* @param value value of the value key
*/
private void doComputations(String timeBucket, Integer dimensionCombinationId, String dimValueName, String valueKeyName, Number value)
{
StringBuilder sb = new StringBuilder();
sb.append(timeBucket).append("|").append(recordType.get(LogstreamUtil.LOG_TYPE)).append("|").append(recordType.get(LogstreamUtil.FILTER)).append("|").append(dimensionCombinationId).append("|").append(valueKeyName);
//final key format --> timebucket|type|filter|dimId|val
//eg: m|201311230108|1|4|10|bytes
String key = sb.toString();
Map<AggregateOperation, Number> aggregations;
if (cacheObject.containsKey(key)) {
Map<String, Map<AggregateOperation, Number>> dimValueNames = cacheObject.get(key);
if (dimValueNames.containsKey(dimValueName)) {
aggregations = dimValueNames.get(dimValueName);
}
else {
aggregations = new HashMap<AggregateOperation, Number>();
for (AggregateOperation aggregationType : valueOperationTypes.get(valueKeyName)) {
aggregations.put(aggregationType, new MutableDouble(0));
}
dimValueNames.put(dimValueName, aggregations);
}
}
else {
Map<String, Map<AggregateOperation, Number>> newDimValueNames = new HashMap<String, Map<AggregateOperation, Number>>();
aggregations = new HashMap<AggregateOperation, Number>();
for (AggregateOperation aggregationType : valueOperationTypes.get(valueKeyName)) {
aggregations.put(aggregationType, new MutableDouble(0));
}
newDimValueNames.put(dimValueName, aggregations);
cacheObject.put(key, newDimValueNames);
}
if (aggregations.containsKey(AggregateOperation.SUM)) {
MutableDouble aggrVal = (MutableDouble)aggregations.get(AggregateOperation.SUM);
aggrVal.add(value);
}
if (aggregations.containsKey(AggregateOperation.COUNT)) {
MutableDouble aggrVal = (MutableDouble)aggregations.get(AggregateOperation.COUNT);
aggrVal.add(1);
}
if (aggregations.containsKey(AggregateOperation.AVERAGE)) {
double avgVal = aggregations.get(AggregateOperation.AVERAGE).doubleValue();
double countVal = aggregations.get(AggregateOperation.COUNT).doubleValue();
double newAvg = ((avgVal * (countVal - 1)) + value.doubleValue()) / countVal;
aggregations.put(AggregateOperation.AVERAGE, new MutableDouble(newAvg));
}
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
currentWindowId = windowId;
}
@Override
public void endWindow()
{
if (outTimeBuckets == null || outTimeBuckets.isEmpty()) {
return;
}
long time = LogstreamUtil.extractTime(currentWindowId, windowWidth);
// get time buckets for current window id
List<String> timeBucketList = getTimeBucketList(time);
// get list of timebuckets to be emitted and replace them in outTimeBuckets with next time bucket to be emitted
ArrayList<String> emitTimeBucketList = new ArrayList<String>();
for (int i = 0; i < timeBucketList.size(); i++) {
String timeBucket = timeBucketList.get(i);
if (!timeBucket.equals(outTimeBuckets.get(i))) {
emitTimeBucketList.add(outTimeBuckets.get(i));
outTimeBuckets.set(i, timeBucket);
}
}
// emit the computations for each time bucket in emitTimeBucketList and remove those buckets from the cache since they are now already processed
if (!emitTimeBucketList.isEmpty()) {
ArrayList<String> obsoleteKeys = new ArrayList<String>();
for (String outTimeStr : emitTimeBucketList) {
HashMap<String, DimensionObject<String>> outputAggregationsObject;
for (Entry<String, Map<String, Map<AggregateOperation, Number>>> keys : cacheObject.entrySet()) {
String key = keys.getKey();
if (key.startsWith(outTimeStr)) {
Map<String, Map<AggregateOperation, Number>> dimValues = keys.getValue();
for (Entry<String, Map<AggregateOperation, Number>> dimValue : dimValues.entrySet()) {
String dimValueName = dimValue.getKey();
Map<AggregateOperation, Number> operations = dimValue.getValue();
outputAggregationsObject = new HashMap<String, DimensionObject<String>>();
for (Entry<AggregateOperation, Number> operation : operations.entrySet()) {
AggregateOperation aggrOperationType = operation.getKey();
Number aggr = operation.getValue();
String outKey = key + "." + aggrOperationType.name();
DimensionObject<String> outDimObj = new DimensionObject<String>((MutableDouble)aggr, dimValueName);
outputAggregationsObject.put(outKey, outDimObj);
}
aggregationsOutput.emit(outputAggregationsObject);
}
// remove emitted key
obsoleteKeys.add(key);
}
}
for (String key : obsoleteKeys) {
cacheObject.remove(key);
}
}
}
}
/**
* supply the registry object which is used to store and retrieve meta information about each tuple
*
* @param registry
*/
public void setRegistry(PropertyRegistry<String> registry)
{
this.registry = registry;
}
/**
* Supply the properties to the operator.
* The properties include type, timebucket, dimensioncombinations, values and operations on them
* Input includes following properties:
* type=logtype // input logtype for which the properties are to be set
* timebucket= time bucket character //time bucket can be one of the following values s(for second)/m(for minute)/h(for hour)/D(for day)/W(for week)/M(for month)/Y(for year)
* dimensions=a:b:c //colon separated dimension combination combination for which computations are expected
* values=value.metric[:value.metric] //list of dot concatenated value name and metric separated by colon
* eg: type=apache,timebucket=m,timebucket=h,dimensions=a:b:c,dimensions=b:c,dimensions=b,dimensions=d,values=x.sum:y.sum:y.avg
*
* @param properties
*/
public void addPropertiesFromString(String[] properties)
{
try {
ArrayList<Integer> dimCombinations = new ArrayList<Integer>();
HashMap<String, HashSet<AggregateOperation>> valOpTypes = new HashMap<String, HashSet<AggregateOperation>>();
String type = null;
// user input example::
// type=apache,timebucket=m,timebucket=h,dimensions=a:b:c,dimensions=b:c,dimensions=b,dimensions=d,values=x.sum:y.sum:y.avg
for (String inputs : properties) {
String[] split = inputs.split("=", 2);
if (split[0].toLowerCase().equals("timebucket")) {
int timeBucket = LogstreamUtil.extractTimeBucket(split[1]);
if (timeBucket == 0) {
logger.error("invalid time bucket", split[1]);
}
timeBucketFlags |= timeBucket;
}
else if (split[0].toLowerCase().equals("values")) {
String[] values = split[1].split(":");
for (String value : values) {
String[] valueNames = value.split("\\.");
String valueName = valueNames[0];
String valueType = valueNames[1];
if (valueType.toLowerCase().equals("sum")) {
if (valOpTypes.containsKey(valueName)) {
valOpTypes.get(valueName).add(AggregateOperation.SUM);
}
else {
HashSet<AggregateOperation> valueTypeList = new HashSet<AggregateOperation>();
valueTypeList.add(AggregateOperation.SUM);
valOpTypes.put(valueName, valueTypeList);
}
}
else if (valueType.equals("avg") || valueType.equals("average")) {
if (valOpTypes.containsKey(valueName)) {
valOpTypes.get(valueName).add(AggregateOperation.AVERAGE);
valOpTypes.get(valueName).add(AggregateOperation.COUNT);
}
else {
HashSet<AggregateOperation> valueTypeList = new HashSet<AggregateOperation>();
valueTypeList.add(AggregateOperation.AVERAGE);
valueTypeList.add(AggregateOperation.COUNT);
valOpTypes.put(valueName, valueTypeList);
}
}
else if (valueType.equals("count")) {
if (valOpTypes.containsKey(valueName)) {
valOpTypes.get(valueName).add(AggregateOperation.COUNT);
}
else {
HashSet<AggregateOperation> valueTypeList = new HashSet<AggregateOperation>();
valueTypeList.add(AggregateOperation.COUNT);
valOpTypes.put(valueName, valueTypeList);
}
}
}
}
else if (split[0].toLowerCase().equals("type")) {
type = split[1];
}
else if (split[0].toLowerCase().equals("dimensions")) {
// dimensions
String dimensions = split[1];
int dim = registry.bind("DIMENSION", dimensions);
dimCombinations.add(dim);
}
else {
throw new ValidationException("Invalid input property string " + Arrays.toString(properties));
}
}
dimensionCombinationList.put(registry.getIndex(LogstreamUtil.LOG_TYPE, type), dimCombinations);
valueOperations.put(registry.getIndex(LogstreamUtil.LOG_TYPE, type), valOpTypes);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* {@inheritDoc}
*/
@Override
public Collection<Partition<DimensionOperator>> definePartitions(Collection<Partition<DimensionOperator>> partitions, PartitioningContext context)
{
ArrayList<Partition<DimensionOperator>> newPartitions = new ArrayList<Partition<DimensionOperator>>();
String[] filters = registry.list(LogstreamUtil.FILTER);
int partitionSize;
if (partitions.size() == 1) {
// initial partitions; functional partitioning
partitionSize = filters.length;
}
else {
// redo partitions; double the partitions
partitionSize = partitions.size() * 2;
}
for (int i = 0; i < partitionSize; i++) {
try {
DimensionOperator dimensionOperator = new DimensionOperator();
dimensionOperator.registry = registry;
dimensionOperator.timeBucketFlags = timeBucketFlags;
dimensionOperator.valueOperations = new HashMap<Integer, HashMap<String, HashSet<AggregateOperation>>>(valueOperations);
dimensionOperator.dimensionCombinationList = new HashMap<Integer, ArrayList<Integer>>(dimensionCombinationList);
Partition<DimensionOperator> partition = new DefaultPartition<DimensionOperator>(dimensionOperator);
newPartitions.add(partition);
}
catch (Throwable ex) {
DTThrowable.rethrow(ex);
}
}
int partitionBits = (Integer.numberOfLeadingZeros(0) - Integer.numberOfLeadingZeros(partitionSize / filters.length - 1));
int partitionMask = 0;
if (partitionBits > 0) {
partitionMask = -1 >>> (Integer.numberOfLeadingZeros(-1)) - partitionBits;
}
partitionMask = (partitionMask << 16) | 0xffff; // right most 16 bits used for functional partitioning
for (int i = 0; i < newPartitions.size(); i++) {
Partition<DimensionOperator> partition = newPartitions.get(i);
String partitionVal = filters[i % filters.length];
int bits = i / filters.length;
int filterId = registry.getIndex(LogstreamUtil.FILTER, partitionVal);
filterId = 0xffff & filterId; // clear out first 16 bits
int partitionKey = (bits << 16) | filterId; // first 16 bits for dynamic partitioning, last 16 bits for functional partitioning
logger.debug("partitionKey = {} partitionMask = {}", Integer.toBinaryString(partitionKey), Integer.toBinaryString(partitionMask));
partition.getPartitionKeys().put(in, new PartitionKeys(partitionMask, Sets.newHashSet(partitionKey)));
}
return newPartitions;
}
@Override
public void partitioned(Map<Integer, Partition<DimensionOperator>> partitions)
{
}
/**
* Time key for dimension computations, if not provided then window timestamp is used
*
* @param timeKeyName
*/
public void setTimeKeyName(String timeKeyName)
{
this.timeKeyName = timeKeyName;
}
/**
* extracts the meta information about the tuple
*
* @param tuple
*/
private void extractType(Map<String, Object> tuple)
{
recordType.put(LogstreamUtil.LOG_TYPE, (Number)tuple.get(LogstreamUtil.LOG_TYPE));
recordType.put(LogstreamUtil.FILTER, (Number)tuple.get(LogstreamUtil.FILTER));
}
private void createAllDimensionCombinations()
{
logger.info("need to create all dimensions for type {}", recordType.get(LogstreamUtil.LOG_TYPE));
//TODO create all dim combinations
// temporary code to skip null pointer
dimensionCombinationList.put((Integer)recordType.get(LogstreamUtil.LOG_TYPE), new ArrayList<Integer>());
}
public static class DimensionOperatorStreamCodec extends KryoSerializableStreamCodec<Map<String, Object>>
{
private static final Logger logger = LoggerFactory.getLogger(DimensionOperatorStreamCodec.class);
@Override
public int getPartition(Map<String, Object> o)
{
int ret = 0;
int filterId = (Integer)o.get(LogstreamUtil.FILTER);
int hashCode = o.hashCode();
filterId = 0xffff & filterId; // clear out first 16 bits
ret = (hashCode << 16) | filterId; // first 16 bits represent hashcode, last 16 bits represent filter type
return ret;
}
private static final long serialVersionUID = 1L;
}
}