blob: 06b3027e2cf8e842afa0486042f6a08bd7b9cafc [file] [log] [blame]
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import javax.validation.constraints.Min;
import java.util.Map;
import java.util.Random;
* Generates sales events data and sends them out as JSON encoded byte arrays.
* <p>
* Sales events are JSON string encoded as byte arrays. All id's are expected to be positive integers, and default to 1.
* Transaction amounts are double values with two decimal places. Timestamp is unix epoch in milliseconds.
* Product categories are not assigned by default. They are expected to be added by the Enrichment operator, but can be
* enabled with addProductCategory override.
* Example Sales Event
* {
* "productId": 1,
* "customerId": 12345,
* "productCategory": 0,
* "regionId": 2,
* "channelId": 3,
* "amount": 107.99,
* "tax": 7.99,
* "discount": 15.73,
* "timestamp": 1412897574000
* }
* @displayName JSON Sales Event Generator
* @category Input
* @tags input, generator, json
* @since 2.0.0
public class JsonSalesGenerator implements InputOperator
private int maxProductId = 100;
private int maxCustomerId = 1000000;
private int maxChannelId = 3;
private int maxRegionId = 10;
private double minAmount = 0.99;
private double maxAmount = 100.0;
private double taxTiers[] = new double[] { 0.0, 0.04, 0.055, 0.0625, 0.0725, 0.085};
private double discountTiers[] = new double[] {0.0, 0.025, 0.05, 0.10, 0.15, 0.50};
private double maxDiscountPercent = 0.75;
// Should not be included by default - only used for testing when running without enrichment operator
private boolean addProductCategory = false;
private int maxProductCategories = 5;
// Limit number of emitted tuples per window
private long maxTuplesPerWindow = 40000;
// Maximum amount of deviation below the maximum tuples per window
private int tuplesPerWindowDeviation = 20000;
// Number of windows to maintain the same deviation before selecting another
private int tuplesRateCycle = 40;
// Number of windows to maintain the same discount before selecting another
private int discountCycle = 600;
private int regionalCycle = 60;
private int channelCycle = 60;
* Outputs sales event in JSON format as a byte array
public final transient DefaultOutputPort<byte[]> jsonBytes = new DefaultOutputPort<byte[]>();
private static final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
private final Random random = new Random();
private long tuplesCounter = 0;
private long tuplesPerCurrentWindow = maxTuplesPerWindow;
private transient Map<Integer, Double> channelDiscount = Maps.newHashMap();
private transient Map<Integer, Double> regionalDiscount = Maps.newHashMap();
private transient Map<Integer, Double> regionalTax = Maps.newHashMap();
private transient RandomWeightedMovableGenerator<Integer> regionalGenerator = new RandomWeightedMovableGenerator<Integer>();
private transient RandomWeightedMovableGenerator<Integer> channelGenerator = new RandomWeightedMovableGenerator<Integer>();
public void beginWindow(long windowId)
tuplesCounter = 0;
// Generate new output rate after tuplesRateCycle windows ONLY if tuplesPerWindowDeviation is non-zero
if (windowId % tuplesRateCycle == 0 && tuplesPerWindowDeviation > 0) {
tuplesPerCurrentWindow = maxTuplesPerWindow - random.nextInt(tuplesPerWindowDeviation);
// Generate new discounts after every discountCycle windows
if ( windowId % discountCycle == 0) {
// Update channel selection probability (represents sales volume shifts between channels)
if (windowId % channelCycle == 0) {
// Update region selection probability (represents sales volume shifts between regions)
if (windowId % regionalCycle == 0) {
public void endWindow()
public void setup(Context.OperatorContext context)
tuplesPerCurrentWindow = maxTuplesPerWindow;
public void teardown()
public void emitTuples()
while (tuplesCounter++ < tuplesPerCurrentWindow) {
try {
SalesEvent salesEvent = generateSalesEvent();
} catch (Exception ex) {
throw new RuntimeException(ex);
* Initialize regional and channel data generators
void initializeDataGenerators() {
// Channels - with increased level of change
channelGenerator.setMoveDeviation(channelGenerator.getMoveDeviation() * 2);
for (int i=1; i <= getMaxChannelId(); i++) {
// Regions - default level of change
for (int i=1; i <= getMaxRegionId(); i++) {
* Generate discounts per sales channel and region based on discount tiers
void generateDiscounts() {
// Discounts per sales channel
for (int i=1; i <= getMaxChannelId(); i++) {
// 50% chance of applying discount to a channel
channelDiscount.put(i, (random.nextBoolean()) ? discountTiers[random.nextInt(discountTiers.length)] : 0.0);
// Discounts per region
for (int i=1; i <= getMaxRegionId(); i++) {
// 50% chance of applying discount to a region
regionalDiscount.put(i, (random.nextBoolean()) ? discountTiers[random.nextInt(discountTiers.length)] : 0.0);
* Generate taxes based on each region
void generateRegionalTax() {
for (int i=1; i <= getMaxRegionId(); i++) {
regionalTax.put(i, taxTiers[random.nextInt(taxTiers.length)]);
SalesEvent generateSalesEvent() throws Exception {
SalesEvent salesEvent = new SalesEvent();
salesEvent.timestamp = System.currentTimeMillis();
salesEvent.productId = randomId(maxProductId);
salesEvent.channelId =;
salesEvent.regionId =;
salesEvent.customerId = randomCustomerByRegion(salesEvent.regionId);
salesEvent.amount = randomAmount(minAmount, maxAmount); = calculateTax(salesEvent.amount, salesEvent.regionId); = calculateDiscount(salesEvent.amount, salesEvent.channelId, salesEvent.regionId);
if (addProductCategory) {
salesEvent.productCategory = 1 + (salesEvent.productId % maxProductCategories);
return salesEvent;
private int randomId(int max) {
// Provide safe default for invalid max
if (max < 1) return 1;
return 1 + random.nextInt(max);
private int randomCustomerByRegion(int regionId) {
int regionMultiplier = getMaxCustomerId() / getMaxRegionId();
return (regionId * regionMultiplier) - random.nextInt(regionMultiplier);
private double calculateTax(double amount, int regionId) {
double rawAmount = amount * regionalTax.get(regionId);
return Math.floor(rawAmount * 100) / 100;
private double calculateDiscount(double amount, int channelId, int regionId) {
// Total discount is combination of channel and region discounts up to maximum allowed discount percentage
double rawAmount = amount * Math.min(channelDiscount.get(channelId) + regionalDiscount.get(regionId), maxDiscountPercent);
return Math.floor(rawAmount * 100) / 100;
// Generate random double with gaussian distribution between min and max and two decimal places
private double randomAmount(double min, double max) {
// Provide safe default for invalid min/max relationships
if (max <= min) return Math.floor(min * 100) / 100;
double mean = (min + max)/2.0;
double deviation = (max - min)/2.0/3.0;
double rawAmount;
do {
rawAmount = random.nextGaussian() * deviation + mean;
} while (rawAmount < minAmount || rawAmount > maxAmount);
return Math.floor(rawAmount * 100) / 100;
// Generate random tax given transaction amount
private double randomPercent(double amount, double percent) {
double tax = amount * ( random.nextDouble() * percent);
return Math.floor(tax * 100) / 100;
public long getMaxTuplesPerWindow() {
return maxTuplesPerWindow;
public void setMaxTuplesPerWindow(long maxTuplesPerWindow) {
this.maxTuplesPerWindow = maxTuplesPerWindow;
public int getMaxProductId() {
return maxProductId;
public void setMaxProductId(int maxProductId) {
if (maxProductId >= 1)
this.maxProductId = maxProductId;
public int getMaxCustomerId() {
return maxCustomerId;
public void setMaxCustomerId(int maxCustomerId) {
if (maxCustomerId >= 1)
this.maxCustomerId = maxCustomerId;
public int getMaxChannelId() {
return maxChannelId;
public void setMaxChannelId(int maxChannelId) {
if (maxChannelId >= 1)
this.maxChannelId = maxChannelId;
public double getMinAmount() {
return minAmount;
public void setMinAmount(double minAmount) {
this.minAmount = minAmount;
public double getMaxAmount() {
return maxAmount;
public void setMaxAmount(double maxAmount) {
this.maxAmount = maxAmount;
public boolean isAddProductCategory() {
return addProductCategory;
public void setAddProductCategory(boolean addProductCategory) {
this.addProductCategory = addProductCategory;
public int getMaxProductCategories() {
return maxProductCategories;
public void setMaxProductCategories(int maxProductCategories) {
this.maxProductCategories = maxProductCategories;
public int getTuplesPerWindowDeviation() {
return tuplesPerWindowDeviation;
public void setTuplesPerWindowDeviation(int tuplesPerWindowDeviation) {
this.tuplesPerWindowDeviation = tuplesPerWindowDeviation;
public int getTuplesRateCycle() {
return tuplesRateCycle;
public void setTuplesRateCycle(int tuplesRateCycle) {
this.tuplesRateCycle = tuplesRateCycle;
public int getMaxRegionId() {
return maxRegionId;
public void setMaxRegionId(int maxRegionId) {
if (maxRegionId >= 1)
this.maxRegionId = maxRegionId;
public double getMaxDiscountPercent() {
return maxDiscountPercent;
public void setMaxDiscountPercent(double maxDiscountPercent) {
this.maxDiscountPercent = maxDiscountPercent;
public int getDiscountCycle() {
return discountCycle;
public void setDiscountCycle(int discountCycle) {
this.discountCycle = discountCycle;
* A single sales event
class SalesEvent {
/* dimension keys */
public long timestamp;
public int productId;
public int customerId;
public int channelId;
public int regionId;
public int productCategory;
/* metrics */
public double amount;
public double discount;
public double tax;