/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.examples.complete.game.injector;

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableMap;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;


/**
 * This is a generator that simulates usage data from a mobile game, and either publishes the data
 * to a pubsub topic or writes it to a file.
 *
 * <p>The general model used by the generator is the following. There is a set of teams with team
 * members. Each member is scoring points for their team. After some period, a team will dissolve
 * and a new one will be created in its place. There is also a set of 'Robots', or spammer users.
 * They hop from team to team. The robots are set to have a higher 'click rate' (generate more
 * events) than the regular team members.
 *
 * <p>Each generated line of data has the following form:
 * username,teamname,score,timestamp_in_ms,readable_time
 * e.g.:
 * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
 *
 * <p>The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
 * specified. It takes the following arguments:
 * {@code Injector project-name (topic-name|none) (filename|none)}.
 *
 * <p>To run the Injector in the mode where it publishes to PubSub, you will need to authenticate
 * locally using project-based service account credentials to avoid running over PubSub
 * quota.
 * See https://developers.google.com/identity/protocols/application-default-credentials
 * for more information on using service account credentials. Set the GOOGLE_APPLICATION_CREDENTIALS
 * environment variable to point to your downloaded service account credentials before starting the
 * program, e.g.:
 * {@code export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
 * If you do not do this, then your injector will only run for a few minutes on your
 * 'user account' credentials before you will start to see quota error messages like:
 * "Request throttled due to user QPS limit being reached", and see this exception:
 * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 Too Many Requests".
 * Once you've set up your credentials, run the Injector like this":
  * <pre>{@code
 * Injector <project-name> <topic-name> none
 * }
 * </pre>
 * The pubsub topic will be created if it does not exist.
 *
 * <p>To run the injector in write-to-file-mode, set the topic name to "none" and specify the
 * filename:
 * <pre>{@code
 * Injector <project-name> none <filename>
 * }
 * </pre>
 */
class Injector {
  private static Pubsub pubsub;
  private static Random random = new Random();
  private static String topic;
  private static String project;
  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";

  // QPS ranges from 800 to 1000.
  private static final int MIN_QPS = 800;
  private static final int QPS_RANGE = 200;
  // How long to sleep, in ms, between creation of the threads that make API requests to PubSub.
  private static final int THREAD_SLEEP_MS = 500;

  // Lists used to generate random team names.
  private static final ArrayList<String> COLORS =
      new ArrayList<String>(Arrays.asList(
         "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
         "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", "AppleGreen",
         "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", "Banana",
         "Beige", "Bisque", "BarnRed", "BattleshipGrey"));

  private static final ArrayList<String> ANIMALS =
      new ArrayList<String>(Arrays.asList(
         "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", "Dingo", "Numbat", "Emu",
         "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", "Platypus",
         "Bandicoot", "Cockatoo", "Antechinus"));

  // The list of live teams.
  private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();

  private static DateTimeFormatter fmt =
    DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
        .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));


  // The total number of robots in the system.
  private static final int NUM_ROBOTS = 20;
  // Determines the chance that a team will have a robot team member.
  private static final int ROBOT_PROBABILITY = 3;
  private static final int NUM_LIVE_TEAMS = 15;
  private static final int BASE_MEMBERS_PER_TEAM = 5;
  private static final int MEMBERS_PER_TEAM = 15;
  private static final int MAX_SCORE = 20;
  private static final int LATE_DATA_RATE = 5 * 60 * 2;       // Every 10 minutes
  private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000;  // 5-10 minute delay
  private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;

  // The minimum time a 'team' can live.
  private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
  private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;


  /**
   * A class for holding team info: the name of the team, when it started,
   * and the current team members. Teams may but need not include one robot team member.
   */
  private static class TeamInfo {
    String teamName;
    long startTimeInMillis;
    int expirationPeriod;
    // The team might but need not include 1 robot. Will be non-null if so.
    String robot;
    int numMembers;

    private TeamInfo(String teamName, long startTimeInMillis, String robot) {
      this.teamName = teamName;
      this.startTimeInMillis = startTimeInMillis;
      // How long until this team is dissolved.
      this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS)
        + BASE_TEAM_EXPIRATION_TIME_IN_MINS;
      this.robot = robot;
      // Determine the number of team members.
      numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
    }

    String getTeamName() {
      return teamName;
    }
    String getRobot() {
      return robot;
    }

    long getStartTimeInMillis() {
      return startTimeInMillis;
    }
    long getEndTimeInMillis() {
      return startTimeInMillis + (expirationPeriod * 60L * 1000L);
    }
    String getRandomUser() {
      int userNum = random.nextInt(numMembers);
      return "user" + userNum + "_" + teamName;
    }

    int numMembers() {
      return numMembers;
    }

    @Override
    public String toString() {
      return "(" + teamName + ", num members: " + numMembers() + ", starting at: "
        + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: " + robot + ")";
    }
  }

  /** Utility to grab a random element from an array of Strings. */
  private static String randomElement(ArrayList<String> list) {
    int index = random.nextInt(list.size());
    return list.get(index);
  }

  /**
   * Get and return a random team. If the selected team is too old w.r.t its expiration, remove
   * it, replacing it with a new team.
   */
  private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
    int index = random.nextInt(list.size());
    TeamInfo team = list.get(index);
    // If the selected team is expired, remove it and return a new team.
    long currTime = System.currentTimeMillis();
    if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
      System.out.println("\nteam " + team + " is too old; replacing.");
      System.out.println("start time: " + team.getStartTimeInMillis()
        + ", end time: " + team.getEndTimeInMillis()
        + ", current time:" + currTime);
      removeTeam(index);
      // Add a new team in its stead.
      return (addLiveTeam());
    } else {
      return team;
    }
  }

  /**
   * Create and add a team. Possibly add a robot to the team.
   */
  private static synchronized TeamInfo addLiveTeam() {
    String teamName = randomElement(COLORS) + randomElement(ANIMALS);
    String robot = null;
    // Decide if we want to add a robot to the team.
    if (random.nextInt(ROBOT_PROBABILITY) == 0) {
      robot = "Robot-" + random.nextInt(NUM_ROBOTS);
    }
    // Create the new team.
    TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), robot);
    liveTeams.add(newTeam);
    System.out.println("[+" + newTeam + "]");
    return newTeam;
  }

  /**
   * Remove a specific team.
   */
  private static synchronized void removeTeam(int teamIndex) {
    TeamInfo removedTeam = liveTeams.remove(teamIndex);
    System.out.println("[-" + removedTeam + "]");
  }

  /** Generate a user gaming event. */
  private static String generateEvent(Long currTime, int delayInMillis) {
    TeamInfo team = randomTeam(liveTeams);
    String teamName = team.getTeamName();
    String user;
    final int parseErrorRate = 900000;

    String robot = team.getRobot();
    // If the team has an associated robot team member...
    if (robot != null) {
      // Then use that robot for the message with some probability.
      // Set this probability to higher than that used to select any of the 'regular' team
      // members, so that if there is a robot on the team, it has a higher click rate.
      if (random.nextInt(team.numMembers() / 2) == 0) {
        user = robot;
      } else {
        user = team.getRandomUser();
      }
    } else { // No robot.
      user = team.getRandomUser();
    }
    String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
    // Randomly introduce occasional parse errors.
    if (random.nextInt(parseErrorRate) == 0) {
      System.out.println("Introducing a parse error.");
      event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
    }
    return addTimeInfoToEvent(event, currTime, delayInMillis);
  }

  /**
   * Add time info to a generated gaming event.
   */
  private static String addTimeInfoToEvent(String message, Long currTime, int delayInMillis) {
    String eventTimeString =
        Long.toString((currTime - delayInMillis) / 1000 * 1000);
    // Add a (redundant) 'human-readable' date string to make the data semantics more clear.
    String dateString = fmt.print(currTime);
    message = message + "," + eventTimeString + "," + dateString;
    return message;
  }

  /**
   * Publish 'numMessages' arbitrary events from live users with the provided delay, to a
   * PubSub topic.
   */
  public static void publishData(int numMessages, int delayInMillis)
      throws IOException {
    List<PubsubMessage> pubsubMessages = new ArrayList<>();

    for (int i = 0; i < Math.max(1, numMessages); i++) {
      Long currTime = System.currentTimeMillis();
      String message = generateEvent(currTime, delayInMillis);
      PubsubMessage pubsubMessage = new PubsubMessage()
              .encodeData(message.getBytes("UTF-8"));
      pubsubMessage.setAttributes(
          ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
              Long.toString((currTime - delayInMillis) / 1000 * 1000)));
      if (delayInMillis != 0) {
        System.out.println(pubsubMessage.getAttributes());
        System.out.println("late data for: " + message);
      }
      pubsubMessages.add(pubsubMessage);
    }

    PublishRequest publishRequest = new PublishRequest();
    publishRequest.setMessages(pubsubMessages);
    pubsub.projects().topics().publish(topic, publishRequest).execute();
  }

  /**
   * Publish generated events to a file.
   */
  public static void publishDataToFile(String fileName, int numMessages, int delayInMillis)
      throws IOException {
    PrintWriter out = new PrintWriter(new OutputStreamWriter(
        new BufferedOutputStream(new FileOutputStream(fileName, true)), "UTF-8"));

    try {
      for (int i = 0; i < Math.max(1, numMessages); i++) {
        Long currTime = System.currentTimeMillis();
        String message = generateEvent(currTime, delayInMillis);
        out.println(message);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      if (out != null) {
        out.flush();
        out.close();
      }
    }
  }


  public static void main(String[] args) throws IOException, InterruptedException {
    if (args.length < 3) {
      System.out.println("Usage: Injector project-name (topic-name|none) (filename|none)");
      System.exit(1);
    }
    boolean writeToFile = false;
    boolean writeToPubsub = true;
    project = args[0];
    String topicName = args[1];
    String fileName = args[2];
    // The Injector writes either to a PubSub topic, or a file. It will use the PubSub topic if
    // specified; otherwise, it will try to write to a file.
    if (topicName.equalsIgnoreCase("none")) {
      writeToFile = true;
      writeToPubsub = false;
    }
    if (writeToPubsub) {
      // Create the PubSub client.
      pubsub = InjectorUtils.getClient();
      // Create the PubSub topic as necessary.
      topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
      InjectorUtils.createTopic(pubsub, topic);
      System.out.println("Injecting to topic: " + topic);
    } else {
      if (fileName.equalsIgnoreCase("none")) {
        System.out.println("Filename not specified.");
        System.exit(1);
      }
      System.out.println("Writing to file: " + fileName);
    }
    System.out.println("Starting Injector");

    // Start off with some random live teams.
    while (liveTeams.size() < NUM_LIVE_TEAMS) {
      addLiveTeam();
    }

    // Publish messages at a rate determined by the QPS and Thread sleep settings.
    for (int i = 0; true; i++) {
      if (Thread.activeCount() > 10) {
        System.err.println("I'm falling behind!");
      }

      // Decide if this should be a batch of late data.
      final int numMessages;
      final int delayInMillis;
      if (i % LATE_DATA_RATE == 0) {
        // Insert delayed data for one user (one message only)
        delayInMillis = BASE_DELAY_IN_MILLIS + random.nextInt(FUZZY_DELAY_IN_MILLIS);
        numMessages = 1;
        System.out.println("DELAY(" + delayInMillis + ", " + numMessages + ")");
      } else {
        System.out.print(".");
        delayInMillis = 0;
        numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
      }

      if (writeToFile) { // Won't use threading for the file write.
        publishDataToFile(fileName, numMessages, delayInMillis);
      } else { // Write to PubSub.
        // Start a thread to inject some data.
        new Thread(){
          @Override
          public void run() {
            try {
              publishData(numMessages, delayInMillis);
            } catch (IOException e) {
              System.err.println(e);
            }
          }
        }.start();
      }

      // Wait before creating another injector thread.
      Thread.sleep(THREAD_SLEEP_MS);
    }
  }
}
