| /* |
| * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.demos.dimensions.generic; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import org.apache.commons.lang.builder.ToStringBuilder; |
| import org.apache.commons.lang.builder.ToStringStyle; |
| import org.codehaus.jackson.map.ObjectMapper; |
| |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * Describes schema for performing dimensional computation on a stream of Map<String,Object> tuples. |
| * |
| * Schema can be specified as a JSON string with following keys. |
| * |
| * fields: Map of all the field names and their types. Supported types: java.lang.(Integer, Long, Float, Double) |
| * dimension: Array of dimensions with fields separated by colon, and time prefixed with time=. Supported time units: MINUTES, HOURS, DAYS |
| * aggregates: Fields to aggregate for specified dimensions. Aggregates types can include: sum, avg, min, max |
| * timestamp: Name of the timestamp field. Data type should be Long with value in milliseconds since Jan 1, 1970 GMT. |
| * |
| * JSON schema for Ads demo: |
| * |
| * { |
| * "fields": { |
| * "publisherId": "java.lang.Integer", |
| * "advertiserId": "java.lang.Integer", |
| * "adUnit": "java.lang.Integer", |
| * "clicks": "java.lang.Long", |
| * "price": "java.lang.Long", |
| * "cost": "java.lang.Double", |
| * "revenue": "java.lang.Double", |
| * "timestamp": "java.lang.Long", |
| * "impressions": "java.lang.Long" |
| * }, |
| * "dimensions": [ |
| * "time=MINUTES", |
| * "time=MINUTES:adUnit", |
| * "time=MINUTES:advertiserId", |
| * "time=MINUTES:publisherId", |
| * "time=MINUTES:advertiserId:adUnit", |
| * "time=MINUTES:publisherId:adUnit", |
| * "time=MINUTES:publisherId:advertiserId", |
| * "time=MINUTES:publisherId:advertiserId:adUnit" |
| * ], |
| * "aggregates": { |
| * "impressions": "sum", |
| * "clicks": "sum", |
| * "price": "sum", |
| * "cost": "sum", |
| * "revenue": "sum" |
| * }, |
| * "timestamp": "timestamp" |
| * } |
| * |
| * JSON schema for Sales demo: |
| * |
| * { |
| * "fields": { |
| * "timestamp": "java.lang.Long", |
| * "productId": "java.lang.Integer", |
| * "customerId": "java.lang.Integer", |
| * "channelId": "java.lang.Integer", |
| * "regionId": "java.lang.Integer", |
| * "productCategory": "java.lang.Integer", |
| * "amount": "java.lang.Double", |
| * "discount": "java.lang.Double", |
| * "tax": "java.lang.Double" |
| * }, |
| * "dimensions": [ |
| * "time=MINUTES", |
| * "time=MINUTES:productCategory", |
| * "time=MINUTES:channelId", |
| * "time=MINUTES:regionId", |
| * "time=MINUTES:productCategory:channelId", |
| * "time=MINUTES:productCategory:regionId", |
| * "time=MINUTES:channelId:regionId", |
| * "time=MINUTES:productCategory:channelId:regionId" |
| * ], |
| * "aggregates": { |
| * "amount": "sum", |
| * "discount": "sum", |
| * "tax": "sum" |
| * }, |
| * "timestamp": "timestamp" |
| * } |
| * |
| */ |
| public class EventSchema implements Serializable |
| { |
| private static final long serialVersionUID = 4586481500190519858L; |
| |
| public Map<String, Class<?>> fields; |
| |
| public List<String> keys; |
| public Map<String, String> aggregates; |
| public List<String> dimensions; |
| |
| public String timestamp = "timestamp"; |
| |
| // Used to map between event schema and generic event arrays for keys and values |
| transient public List<String> genericEventKeys = Lists.newArrayList(); |
| transient public List<String> genericEventValues = Lists.newArrayList(); |
| |
| transient private int keyLen; |
| transient private int valLen; |
| |
| public static final String DEFAULT_SCHEMA_ADS = "{\n" + |
| " \"fields\": {\n" + |
| " \"publisherId\": \"java.lang.Integer\",\n" + |
| " \"advertiserId\": \"java.lang.Integer\",\n" + |
| " \"adUnit\": \"java.lang.Integer\",\n" + |
| " \"clicks\": \"java.lang.Long\",\n" + |
| " \"price\": \"java.lang.Long\",\n" + |
| " \"cost\": \"java.lang.Double\",\n" + |
| " \"revenue\": \"java.lang.Double\",\n" + |
| " \"timestamp\": \"java.lang.Long\",\n" + |
| " \"impressions\": \"java.lang.Long\"\n" + |
| " },\n" + |
| " \"dimensions\": [\n" + |
| " \"time=MINUTES\",\n" + |
| " \"time=MINUTES:adUnit\",\n" + |
| " \"time=MINUTES:advertiserId\",\n" + |
| " \"time=MINUTES:publisherId\",\n" + |
| " \"time=MINUTES:advertiserId:adUnit\",\n" + |
| " \"time=MINUTES:publisherId:adUnit\",\n" + |
| " \"time=MINUTES:publisherId:advertiserId\",\n" + |
| " \"time=MINUTES:publisherId:advertiserId:adUnit\"\n" + |
| " ],\n" + |
| " \"aggregates\": {\n" + |
| " \"impressions\": \"sum\",\n" + |
| " \"clicks\": \"sum\",\n" + |
| " \"price\": \"sum\",\n" + |
| " \"cost\": \"sum\",\n" + |
| " \"revenue\": \"sum\"\n" + |
| " },\n" + |
| " \"timestamp\": \"timestamp\"\n" + |
| "}"; |
| public static final String DEFAULT_SCHEMA_SALES = "{\n" + |
| " \"fields\": {\n" + |
| " \"timestamp\": \"java.lang.Long\",\n" + |
| " \"productId\": \"java.lang.Integer\",\n" + |
| " \"customerId\": \"java.lang.Integer\",\n" + |
| " \"channelId\": \"java.lang.Integer\",\n" + |
| " \"regionId\": \"java.lang.Integer\",\n" + |
| " \"productCategory\": \"java.lang.Integer\",\n" + |
| " \"amount\": \"java.lang.Double\",\n" + |
| " \"discount\": \"java.lang.Double\",\n" + |
| " \"tax\": \"java.lang.Double\"\n" + |
| " },\n" + |
| " \"dimensions\": [\n" + |
| " \"time=MINUTES\",\n" + |
| " \"time=MINUTES:productCategory\",\n" + |
| " \"time=MINUTES:channelId\",\n" + |
| " \"time=MINUTES:regionId\",\n" + |
| " \"time=MINUTES:productCategory:channelId\",\n" + |
| " \"time=MINUTES:productCategory:regionId\",\n" + |
| " \"time=MINUTES:channelId:regionId\",\n" + |
| " \"time=MINUTES:productCategory:channelId:regionId\"\n" + |
| " ],\n" + |
| " \"aggregates\": {\n" + |
| " \"amount\": \"sum\",\n" + |
| " \"discount\": \"sum\",\n" + |
| " \"tax\": \"sum\"\n" + |
| " },\n" + |
| " \"timestamp\": \"timestamp\"\n" + |
| "}"; |
| |
| public static EventSchema createFromJSON(String json) throws Exception { |
| ObjectMapper mapper = new ObjectMapper(); |
| EventSchema eventSchema = mapper.readValue(json, EventSchema.class); |
| |
| if ( eventSchema.dimensions.isEmpty() ) { |
| throw new IllegalArgumentException("EventSchema JSON must specify dimensions list"); |
| } |
| |
| // Generate list of keys from dimensions specified |
| Set<String> uniqueKeys = Sets.newHashSet(); |
| for(String dimension: eventSchema.dimensions) { |
| String[] attributes = dimension.split(":"); |
| for (String attribute : attributes) { |
| String[] keyval = attribute.split("=", 2); |
| String key = keyval[0]; |
| if (key.equals("time")) { |
| continue; |
| } |
| uniqueKeys.add(key); |
| } |
| } |
| eventSchema.genericEventKeys.addAll(uniqueKeys); |
| eventSchema.keys.addAll(uniqueKeys); |
| eventSchema.keys.add(eventSchema.getTimestamp()); |
| |
| eventSchema.genericEventValues.addAll(eventSchema.aggregates.keySet()); |
| |
| return eventSchema; |
| } |
| |
| public EventSchema() |
| { |
| this.dimensions = Lists.newArrayList(); |
| this.aggregates = Maps.newHashMap(); |
| this.keys = Lists.newArrayList(); |
| this.fields = Maps.newHashMap(); |
| } |
| |
| public String getTimestamp() { |
| return timestamp; |
| } |
| |
| public void setFields(Map<String, Class<?>> fields) |
| { |
| this.fields = fields; |
| } |
| |
| public void setKeys(List<String> keys) |
| { |
| this.keys = keys; |
| } |
| |
| public Collection<String> getGenericEventValues() { |
| return aggregates.keySet(); |
| } |
| |
| public void setAggregates(Map<String, String> aggregates) |
| { |
| this.aggregates = aggregates; |
| } |
| |
| public Class<?> getClass(String field) { |
| return fields.get(field); |
| } |
| |
| public int getKeyLen() { |
| if (keyLen == 0) |
| keyLen = getSerializedLength(keys); |
| return keyLen; |
| } |
| |
| public int getValLen() { |
| if (valLen == 0) |
| valLen = getSerializedLength(getGenericEventValues()); |
| return valLen; |
| } |
| |
| public int getSerializedLength(Collection<String> fields) { |
| int len = 0; |
| for(String field : fields) { |
| Class<?> k = this.fields.get(field); |
| len += GenericAggregateSerializer.fieldSerializers.get(k).dataLength(); |
| } |
| return len; |
| } |
| |
| public Class<?> getType(String param) |
| { |
| return fields.get(param); |
| } |
| |
| public Object typeCast(String input, String fieldKey) { |
| if (input == null) return null; |
| Class<?> c = getType(fieldKey); |
| |
| if (c.equals(Integer.class)) return Integer.valueOf(input); |
| else if (c.equals(Long.class)) return Long.valueOf(input); |
| else if (c.equals(Float.class)) return Float.valueOf(input); |
| else if (c.equals(Double.class)) return Float.valueOf(input); |
| else return input; |
| } |
| |
| public static Map<Class<?>,Object> DEFAULT_VALUES = Maps.newHashMap(); |
| static { |
| DEFAULT_VALUES.put(Integer.class, 0); |
| DEFAULT_VALUES.put(Long.class, 0L); |
| DEFAULT_VALUES.put(Float.class, 0f); |
| DEFAULT_VALUES.put(Double.class, 0d); |
| DEFAULT_VALUES.put(String.class, ""); |
| } |
| public Object defaultValue(String key) { |
| Class<?> c = getType(key); |
| return DEFAULT_VALUES.get(c); |
| } |
| |
| |
| @Override |
| public String toString() { |
| return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); |
| } |
| |
| |
| Object getValue(GenericAggregate ga, String name) { |
| if ( genericEventKeys.contains(name)) |
| return ga.keys[genericEventKeys.indexOf(name)]; |
| else if ( genericEventValues.contains(name)) |
| return ga.aggregates[genericEventValues.indexOf(name)]; |
| else |
| return null; |
| } |
| |
| public void defaultMissingFields(GenericAggregate ga) { |
| for(int i=0; i < genericEventKeys.size(); i++) |
| { |
| if (ga.keys[i] == null) |
| ga.keys[i] = defaultValue(genericEventKeys.get(i)); |
| } |
| for(int i=0; i < genericEventValues.size(); i++) |
| { |
| if (ga.aggregates[i] == null) |
| ga.aggregates[i] = defaultValue(genericEventValues.get(i)); |
| } |
| } |
| |
| GenericEvent convertQueryKeysToGenericEvent(Map<String, String> keys) |
| { |
| GenericEvent event = new GenericEvent(); |
| event.keys = new Object[genericEventKeys.size()]; |
| int idx = 0; |
| for(String key : genericEventKeys) |
| { |
| Object value = typeCast(keys.get(key), key); |
| event.keys[idx++] = (value == null) ? defaultValue(key) : value; |
| } |
| |
| event.values = new Object[genericEventValues.size()]; |
| idx = 0; |
| for(String key : genericEventValues) |
| { |
| event.values[idx++] = defaultValue(key); |
| } |
| |
| return event; |
| } |
| |
| |
| GenericEvent convertMapToGenericEvent(Map<String, Object> tuple) |
| { |
| GenericEvent event = new GenericEvent(); |
| Object timeValue = tuple.get(getTimestamp()); |
| if (timeValue != null) { |
| event.timestamp = ((Number)timeValue).longValue(); |
| } |
| event.keys = new Object[genericEventKeys.size()]; |
| int idx = 0; |
| for(String key : genericEventKeys) |
| { |
| Object value = tuple.get(key); |
| event.keys[idx++] = (value == null) ? defaultValue(key) : value; |
| } |
| event.values = new Object[genericEventValues.size()]; |
| idx = 0; |
| for(String key : genericEventValues) |
| { |
| Object value = tuple.get(key); |
| event.values[idx++] = (value == null) ? defaultValue(key) : value; |
| } |
| return event; |
| } |
| |
| Map<String, Object> convertAggregateEventToMap(GenericAggregate ga) { |
| Map<String, Object> map = Maps.newHashMap(); |
| |
| for (int i=0; i < genericEventKeys.size(); i++) { |
| map.put(genericEventKeys.get(i), ga.keys[i]); |
| } |
| |
| for (int i=0; i < genericEventValues.size(); i++) { |
| map.put(genericEventValues.get(i), ga.aggregates[i]); |
| } |
| |
| map.put(timestamp, ga.getTimestamp()); |
| |
| return map; |
| } |
| |
| |
| public Class<?> getAggregateType(int i) |
| { |
| return getType(genericEventValues.get(i)); |
| } |
| } |