blob: e37ef4c1f94e1f3e0219f54288c8b9c2a0a588f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.util;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
* This is the base implementation of an operator. 
* The operator consumes tuples which are maps from fields to objects. 
* One field in each tuple is considered a "time" field,
* one or more fields are considered "dimensions",
* one or more fields are considered "value" fields. 
* This operator outputs a map at the end of each window whose keys are a combination of the "time" and "dimension" fields. 
* The value of the map is another map whose keys are "value" fields and whose values are a numeric metric. 
* Subclasses must implement the methods used to process the time, dimension, and value fields.
* <p>
* This operator takes in a Map of key value pairs, and output the expanded key value pairs from the dimensions.
* The user of this class is supposed to:
* - provide the time key name in the map by calling setTimeKeyName
* - provide the time bucket unit by calling setTimeBucketFlags
* - (optional) provide the value field keys by calling addValueKeyName.
* - provide the dimension keys by calling addDimensionKeyName
* - (optional) provide the combinations of the dimensions by calling addCombination (if not, it will expand to all possible combinations of the dimension keys
*
* Example:
* input is a map that represents a line in apache access log.
* "time" => the time
* "url" => the url
* "status" => the status code
* "ip" => the ip
* "bytes" => the number of bytes
*
* We set the time bucket to be just MINUTE, time key is "time", value field is "bytes", the dimension keys are "url", "status", and "ip"
*
* The output will be expanded into number_time_buckets * dimension_combinations * number_of_value_fields values. The key will be the combinations of the values in the dimension keys
*
* Note that in most cases, the dimensions are keys with DISCRETE values
* The value fields are keys with AGGREGATE-able values
* </p>
* @displayName Abstract Dimension Time Bucket
* @category Algorithmic
* @tags count, key value, numeric
* @since 0.3.2
*/
public abstract class AbstractDimensionTimeBucketOperator extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractDimensionTimeBucketOperator.class);
/**
* This is the input port which receives tuples that are maps from strings to objects.
*/
@InputPortFieldAnnotation(optional = false)
public final transient DefaultInputPort<Map<String, Object>> in = new DefaultInputPort<Map<String, Object>>()
{
@Override
public void process(Map<String, Object> tuple)
{
try {
long time = extractTimeFromTuple(tuple);
calendar.setTimeInMillis(time);
List<String> timeBucketList = new ArrayList<String>();
if ((timeBucketFlags & TIMEBUCKET_YEAR) != 0) {
timeBucketList.add(String.format("Y|%04d", calendar.get(Calendar.YEAR)));
}
if ((timeBucketFlags & TIMEBUCKET_MONTH) != 0) {
timeBucketList.add(String.format("M|%04d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1));
}
if ((timeBucketFlags & TIMEBUCKET_WEEK) != 0) {
timeBucketList.add(String.format("W|%04d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.WEEK_OF_YEAR)));
}
if ((timeBucketFlags & TIMEBUCKET_DAY) != 0) {
timeBucketList.add(String.format("D|%04d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH)));
}
if ((timeBucketFlags & TIMEBUCKET_HOUR) != 0) {
timeBucketList.add(String.format("h|%04d%02d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY)));
}
if ((timeBucketFlags & TIMEBUCKET_MINUTE) != 0) {
timeBucketList.add(String.format("m|%04d%02d%02d%02d%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE)));
}
for (String timeBucket : timeBucketList) {
for (int[] dimensionCombination : dimensionCombinations) {
String field = "0";
StringBuilder key = new StringBuilder();
if (dimensionCombination != null) {
for (int d : dimensionCombination) {
if (key.length() != 0) {
key.append("|");
}
key.append(String.valueOf(d)).append(":").append(tuple.get(dimensionKeyNames.get(d)).toString());
}
}
AbstractDimensionTimeBucketOperator.this.process(timeBucket, key.toString(), field, 1);
for (int i = 0; i < valueKeyNames.size(); i++) {
String valueKeyName = valueKeyNames.get(i);
field = String.valueOf(i + 1);
Object value = tuple.get(valueKeyName);
Number numberValue = extractNumber(valueKeyName, value);
AbstractDimensionTimeBucketOperator.this.process(timeBucket, key.toString(), field, numberValue);
}
}
}
} catch (Exception ex) {
LOG.warn("Got exception for tuple {}. Ignoring this tuple.", tuple, ex);
}
}
};
/**
* This is the output port which emits the processed data.
*/
@OutputPortFieldAnnotation(optional = false)
public final transient DefaultOutputPort<Map<String, Map<String, Number>>> out = new DefaultOutputPort<Map<String, Map<String, Number>>>();
private List<String> dimensionKeyNames = new ArrayList<String>();
private List<String> valueKeyNames = new ArrayList<String>();
private String timeKeyName;
private long currentWindowId;
private long windowWidth = 500;
private int timeBucketFlags;
private transient TimeZone timeZone = TimeZone.getTimeZone("GMT");
private transient Calendar calendar = new GregorianCalendar(timeZone);
private transient List<int[]> dimensionCombinations = new ArrayList<int[]>();
private List<Set<String>> dimensionCombinationsSet;
private transient NumberFormat numberFormat = NumberFormat.getInstance();
public static final int TIMEBUCKET_MINUTE = 1;
public static final int TIMEBUCKET_HOUR = 2;
public static final int TIMEBUCKET_DAY = 4;
public static final int TIMEBUCKET_WEEK = 8;
public static final int TIMEBUCKET_MONTH = 16;
public static final int TIMEBUCKET_YEAR = 32;
protected long extractTimeFromTuple(Map<String, Object> tuple)
{
long time;
if (timeKeyName == null) {
time = (currentWindowId >>> 32) * 1000 + windowWidth * (currentWindowId & 0xffffffffL);
} else {
time = (Long)tuple.get(timeKeyName);
}
return time;
}
protected Number extractNumber(String valueKeyName, Object value)
{
if (value instanceof Number) {
return (Number)value;
} else if (value == null) {
return new Long(0);
} else {
try {
return numberFormat.parse(value.toString());
} catch (ParseException ex) {
//Fixme
}
}
return new Long(0);
}
/*
* When calling this, the operator no longer expands all combinations of all
* keys but instead only use the combinations the caller supplies
*
* @param keys The key combination to add
*/
public void addCombination(Set<String> keys) throws NoSuchFieldException
{
if (dimensionCombinationsSet == null) {
dimensionCombinationsSet = new ArrayList<Set<String>>();
}
dimensionCombinationsSet.add(keys);
// if (keys == null) {
// dimensionCombinations.add(null);
// }
// else {
// // slow but this function doesn't get executed many times and
// dimensionKeyNames is small.
// int indexKeys[] = new int[keys.size()];
// int i = 0;
// for (String key : keys) {
// indexKeys[i] = -1;
// for (int j = 0; j < dimensionKeyNames.size(); j++) {
// if (dimensionKeyNames.get(j).equals(key)) {
// indexKeys[i] = j;
// break;
// }
// }
// if (indexKeys[i] < 0) {
// throw new NoSuchFieldException("Key not found: " + key);
// }
// i++;
// }
// dimensionCombinations.add(indexKeys);
// }
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
if (context != null) {
windowWidth = context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
}
if (dimensionCombinations.isEmpty() && dimensionCombinationsSet == null) {
dimensionCombinations.add(null);
for (int i = 1; i <= dimensionKeyNames.size(); i++) {
dimensionCombinations.addAll(Combinations.getNumberCombinations(dimensionKeyNames.size(), i));
}
} else if (dimensionCombinationsSet != null) {
for (Set<String> keySet : dimensionCombinationsSet) {
int[] indexKeys = new int[keySet.size()];
int i = 0;
for (String key : keySet) {
indexKeys[i] = -1;
for (int j = 0; j < dimensionKeyNames.size(); j++) {
if (dimensionKeyNames.get(j).equals(key)) {
indexKeys[i] = j;
break;
}
}
if (indexKeys[i] < 0) {
//fixme
}
i++;
}
dimensionCombinations.add(indexKeys);
}
}
logger.info("number of combinations {}",dimensionCombinations.size());
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
currentWindowId = windowId;
}
/**
* Add the key that should be treated as one of the dimensions. The key must be in the input Map
*
* @param key
*/
public void addDimensionKeyName(String key)
{
dimensionKeyNames.add(key);
}
/**
* Add the key that should be treated as one of the value fields. The key must be in the input Map
*
* @param key
*/
public void addValueKeyName(String key)
{
valueKeyNames.add(key);
}
/**
* Set the key that should be treated as the timestamp. The timestamp is assumed to be in epoch millis
*
* @param key
*/
public void setTimeKeyName(String key)
{
timeKeyName = key;
}
/**
* The bit mask that represents which time unit to be used for time bucket.
*
* @param timeBucketFlags The bit mask that represents which time unit to be used for time bucket
*/
public void setTimeBucketFlags(int timeBucketFlags)
{
this.timeBucketFlags = timeBucketFlags;
}
/**
* Set the timezone of the time bucket
*
* @param tz
*/
public void setTimeZone(TimeZone tz)
{
timeZone = tz;
calendar.setTimeZone(timeZone);
}
/**
* The method that subclass must implement to provide routine on what to do with each timeBucket, key, value field and value expanded from the tuple
*
* @param timeBucket The time bucket string
* @param key The key, expanded from dimensions
* @param field The value field
* @param value The value
*/
public abstract void process(String timeBucket, String key, String field, Number value);
public static class Combinations
{
public static <K> List<List<K>> getCombinations(List<K> list, int r)
{
List<List<K>> result = new ArrayList<List<K>>();
List<int[]> combos = getNumberCombinations(list.size(), r);
for (int[] a : combos) {
List<K> keys = new ArrayList<K>(r);
for (int j = 0; j < r; j++) {
keys.add(j, list.get(a[j]));
}
result.add(keys);
}
return result;
}
public static List<int[]> getNumberCombinations(int n, int r)
{
List<int[]> list = new ArrayList<int[]>();
int[] res = new int[r];
for (int i = 0; i < res.length; i++) {
res[i] = i;
}
boolean done = false;
while (!done) {
int[] a = new int[res.length];
System.arraycopy(res, 0, a, 0, res.length);
list.add(a);
done = next(res, n, r);
}
return list;
}
private static boolean next(int[] num, int n, int r)
{
int target = r - 1;
num[target]++;
if (num[target] > ((n - (r - target)))) {
// Carry the One
while (num[target] > ((n - (r - target) - 1))) {
target--;
if (target < 0) {
break;
}
}
if (target < 0) {
return true;
}
num[target]++;
for (int i = target + 1; i < num.length; i++) {
num[i] = num[i - 1] + 1;
}
}
return false;
}
public static void main(String[] args)
{
String[] list = new String[]{"a", "b", "c", "d", "e"};
for (int i = 1; i <= list.length; i++) {
logger.info("Combinations: {}", getCombinations(Arrays.asList(list), i));
}
}
}
private static final Logger logger = LoggerFactory.getLogger(AbstractDimensionTimeBucketOperator.class);
}