blob: d9667ad8cadc41e12e2d9dd33495854b598849d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.game.injector;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableMap;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* This is a generator that simulates usage data from a mobile game, and either publishes the data
* to a pubsub topic or writes it to a file.
*
* <p>The general model used by the generator is the following. There is a set of teams with team
* members. Each member is scoring points for their team. After some period, a team will dissolve
* and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
* They hop from team to team. The robots are set to have a higher 'click rate' (generate more
* events) than the regular team members.
*
* <p>Each generated line of data has the following form:
* username,teamname,score,timestamp_in_ms,readable_time
* e.g.:
* user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
*
* <p>The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
* specified. It takes the following arguments:
* {@code Injector project-name (topic-name|none) (filename|none)}.
*
* <p>To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
* locally using project-based service account credentials to avoid running over PubSub
* quota.
* See https://developers.google.com/identity/protocols/application-default-credentials
* for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
* environment variable to point to your downloaded service account credentials before starting the
* program, e.g.:
* {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
* If you do not do this, then your injector will only run for a few minutes on your
* 'user account' credentials before you will start to see quota error messages like:
* "Request throttled due to user QPS limit being reached", and see this exception:
* ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
* Once you've set up your credentials, run the Injector like this":
* <pre>{@code
* Injector <project-name> <topic-name> none
* }
* </pre>
* The pubsub topic will be created if it does not exist.
*
* <p>To run the injector in write-to-file-mode, set the topic name to "none" and specify the
* filename:
* <pre>{@code
* Injector <project-name> none <filename>
* }
* </pre>
*/
class Injector {
private static Pubsub pubsub;
private static Random random = new Random();
private static String topic;
private static String project;
private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
// QPS ranges from 800 to 1000.
private static final int MIN_QPS = 800;
private static final int QPS_RANGE = 200;
// How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
private static final int THREAD_SLEEP_MS = 500;
// Lists used to generate random team names.
private static final ArrayList<String> COLORS =
new ArrayList<String>(Arrays.asList(
"Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
"Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
"Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
"Beige", "Bisque", "BarnRed", "BattleshipGrey"));
private static final ArrayList<String> ANIMALS =
new ArrayList<String>(Arrays.asList(
"Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
"Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
"Bandicoot", "Cockatoo", "Antechinus"));
// The list of live teams.
private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
private static DateTimeFormatter fmt =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
// The total number of robots in the system.
private static final int NUM_ROBOTS = 20;
// Determines the chance that a team will have a robot team member.
private static final int ROBOT_PROBABILITY = 3;
private static final int NUM_LIVE_TEAMS = 15;
private static final int BASE_MEMBERS_PER_TEAM = 5;
private static final int MEMBERS_PER_TEAM = 15;
private static final int MAX_SCORE = 20;
private static final int LATE_DATA_RATE = 5 * 60 * 2; // Every 10 minutes
private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000; // 5-10 minute delay
private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
// The minimum time a 'team' can live.
private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
/**
* A class for holding team info: the name of the team, when it started,
* and the current team members. Teams may but need not include one robot team member.
*/
private static class TeamInfo {
String teamName;
long startTimeInMillis;
int expirationPeriod;
// The team might but need not include 1 robot. Will be non-null if so.
String robot;
int numMembers;
private TeamInfo(String teamName, long startTimeInMillis, String robot) {
this.teamName = teamName;
this.startTimeInMillis = startTimeInMillis;
// How long until this team is dissolved.
this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS)
+ BASE_TEAM_EXPIRATION_TIME_IN_MINS;
this.robot = robot;
// Determine the number of team members.
numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
}
String getTeamName() {
return teamName;
}
String getRobot() {
return robot;
}
long getStartTimeInMillis() {
return startTimeInMillis;
}
long getEndTimeInMillis() {
return startTimeInMillis + (expirationPeriod * 60L * 1000L);
}
String getRandomUser() {
int userNum = random.nextInt(numMembers);
return "user" + userNum + "_" + teamName;
}
int numMembers() {
return numMembers;
}
@Override
public String toString() {
return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
+ startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
}
}
/** Utility to grab a random element from an array of Strings. */
private static String randomElement(ArrayList<String> list) {
int index = random.nextInt(list.size());
return list.get(index);
}
/**
* Get and return a random team. If the selected team is too old w.r.t its expiration, remove
* it, replacing it with a new team.
*/
private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
int index = random.nextInt(list.size());
TeamInfo team = list.get(index);
// If the selected team is expired, remove it and return a new team.
long currTime = System.currentTimeMillis();
if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
System.out.println("\nteam " + team + " is too old; replacing.");
System.out.println("start time: " + team.getStartTimeInMillis()
+ ", end time: " + team.getEndTimeInMillis()
+ ", current time:" + currTime);
removeTeam(index);
// Add a new team in its stead.
return (addLiveTeam());
} else {
return team;
}
}
/**
* Create and add a team. Possibly add a robot to the team.
*/
private static synchronized TeamInfo addLiveTeam() {
String teamName = randomElement(COLORS) + randomElement(ANIMALS);
String robot = null;
// Decide if we want to add a robot to the team.
if (random.nextInt(ROBOT_PROBABILITY) == 0) {
robot = "Robot-" + random.nextInt(NUM_ROBOTS);
}
// Create the new team.
TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
liveTeams.add(newTeam);
System.out.println("[+" + newTeam + "]");
return newTeam;
}
/**
* Remove a specific team.
*/
private static synchronized void removeTeam(int teamIndex) {
TeamInfo removedTeam = liveTeams.remove(teamIndex);
System.out.println("[-" + removedTeam + "]");
}
/** Generate a user gaming event. */
private static String generateEvent(Long currTime, int delayInMillis) {
TeamInfo team = randomTeam(liveTeams);
String teamName = team.getTeamName();
String user;
final int parseErrorRate = 900000;
String robot = team.getRobot();
// If the team has an associated robot team member...
if (robot != null) {
// Then use that robot for the message with some probability.
// Set this probability to higher than that used to select any of the 'regular' team
// members, so that if there is a robot on the team, it has a higher click rate.
if (random.nextInt(team.numMembers() / 2) == 0) {
user = robot;
} else {
user = team.getRandomUser();
}
} else { // No robot.
user = team.getRandomUser();
}
String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
// Randomly introduce occasional parse errors.
if (random.nextInt(parseErrorRate) == 0) {
System.out.println("Introducing a parse error.");
event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
}
return addTimeInfoToEvent(event, currTime, delayInMillis);
}
/**
* Add time info to a generated gaming event.
*/
private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
String eventTimeString =
Long.toString((currTime - delayInMillis) / 1000 * 1000);
// Add a (redundant) 'human-readable' date string to make the data semantics more clear.
String dateString = fmt.print(currTime);
message = message + "," + eventTimeString + "," + dateString;
return message;
}
/**
* Publish 'numMessages' arbitrary events from live users with the provided delay, to a
* PubSub topic.
*/
public static void publishData(int numMessages, int delayInMillis)
throws IOException {
List<PubsubMessage> pubsubMessages = new ArrayList<>();
for (int i = 0; i < Math.max(1, numMessages); i++) {
Long currTime = System.currentTimeMillis();
String message = generateEvent(currTime, delayInMillis);
PubsubMessage pubsubMessage = new PubsubMessage()
.encodeData(message.getBytes("UTF-8"));
pubsubMessage.setAttributes(
ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
Long.toString((currTime - delayInMillis) / 1000 * 1000)));
if (delayInMillis != 0) {
System.out.println(pubsubMessage.getAttributes());
System.out.println("late data for: " + message);
}
pubsubMessages.add(pubsubMessage);
}
PublishRequest publishRequest = new PublishRequest();
publishRequest.setMessages(pubsubMessages);
pubsub.projects().topics().publish(topic, publishRequest).execute();
}
/**
* Publish generated events to a file.
*/
public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
throws IOException {
PrintWriter out = new PrintWriter(new OutputStreamWriter(
new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));
try {
for (int i = 0; i < Math.max(1, numMessages); i++) {
Long currTime = System.currentTimeMillis();
String message = generateEvent(currTime, delayInMillis);
out.println(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (out != null) {
out.flush();
out.close();
}
}
}
public static void main(String[] args) throws IOException, InterruptedException {
if (args.length < 3) {
System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
System.exit(1);
}
boolean writeToFile = false;
boolean writeToPubsub = true;
project = args[0];
String topicName = args[1];
String fileName = args[2];
// The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
// specified; otherwise, it will try to write to a file.
if (topicName.equalsIgnoreCase("none")) {
writeToFile = true;
writeToPubsub = false;
}
if (writeToPubsub) {
// Create the PubSub client.
pubsub = InjectorUtils.getClient();
// Create the PubSub topic as necessary.
topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
InjectorUtils.createTopic(pubsub, topic);
System.out.println("Injecting to topic: " + topic);
} else {
if (fileName.equalsIgnoreCase("none")) {
System.out.println("Filename not specified.");
System.exit(1);
}
System.out.println("Writing to file: " + fileName);
}
System.out.println("Starting Injector");
// Start off with some random live teams.
while (liveTeams.size() < NUM_LIVE_TEAMS) {
addLiveTeam();
}
// Publish messages at a rate determined by the QPS and Thread sleep settings.
for (int i = 0; true; i++) {
if (Thread.activeCount() > 10) {
System.err.println("I'm falling behind!");
}
// Decide if this should be a batch of late data.
final int numMessages;
final int delayInMillis;
if (i % LATE_DATA_RATE == 0) {
// Insert delayed data for one user (one message only)
delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
numMessages = 1;
System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
} else {
System.out.print(".");
delayInMillis = 0;
numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
}
if (writeToFile) { // Won't use threading for the file write.
publishDataToFile(fileName, numMessages, delayInMillis);
} else { // Write to PubSub.
// Start a thread to inject some data.
new Thread(){
@Override
public void run() {
try {
publishData(numMessages, delayInMillis);
} catch (IOException e) {
System.err.println(e);
}
}
}.start();
}
// Wait before creating another injector thread.
Thread.sleep(THREAD_SLEEP_MS);
}
}
}