/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.examples.reservations;

import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic
 * reservations of a resource at a date. Wait list are also supported. In order to keep the example
 * simple, no checking is done of the date. Also the code is inefficient, if interested in improving
 * it take a look at the EXCERCISE comments.
 */

// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads
// making reservations against N resources. Each thread could randomly reserve and cancel resources
// for a single user. When each thread finishes, it knows what the state of its single user should
// be. When all threads finish, collect their expected state and verify the status of all users and
// resources. For extra credit run the test on a IAAS provider using 10 nodes and 10 threads per
// node.

public class ARS {

  private static final Logger log = LoggerFactory.getLogger(ARS.class);

  private AccumuloClient client;
  private String rTable;

  public enum ReservationResult {
    RESERVED, WAIT_LISTED
  }

  public ARS(AccumuloClient client, String rTable) {
    this.client = client;
    this.rTable = rTable;
  }

  public List<String> setCapacity(String what, String when, int count) {
    // EXCERCISE implement this method which atomically sets a capacity and returns anyone who was
    // moved to the wait list if the capacity was decreased

    throw new UnsupportedOperationException();
  }

  public ReservationResult reserve(String what, String when, String who) throws Exception {

    String row = what + ":" + when;

    // EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation
    // exist then the update will fail. This is a good strategy when it is expected there are
    // usually no reservations. Could modify the code to scan first.

    // The following mutation requires that the column tx:seq does not exist and will fail if it
    // does.
    ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq"));
    update.put("tx", "seq", "0");
    update.put("res", String.format("%04d", 0), who);

    ReservationResult result = ReservationResult.RESERVED;

    // it is important to use an isolated scanner so that only whole mutations are seen
    try (
        ConditionalWriter cwriter = client.createConditionalWriter(rTable,
            new ConditionalWriterConfig());
        Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
      while (true) {
        Status status = cwriter.write(update).getStatus();
        switch (status) {
          case ACCEPTED:
            return result;
          case REJECTED:
          case UNKNOWN:
            // read the row and decide what to do
            break;
          default:
            throw new RuntimeException("Unexpected status " + status);
        }

        // EXCERCISE in the case of many threads trying to reserve a slot, this approach of
        // immediately retrying is inefficient. Exponential back-off is good general solution to
        // solve contention problems like this. However in this particular case, exponential
        // back-off could penalize the earliest threads that attempted to make a reservation by
        // putting them later in the list. A more complex solution could involve having independent
        // sub-queues within the row that approximately maintain arrival order and use exponential
        // back off to fairly merge the sub-queues into the main queue.

        scanner.setRange(new Range(row));

        int seq = -1;
        int maxReservation = -1;

        for (Entry<Key,Value> entry : scanner) {
          String cf = entry.getKey().getColumnFamilyData().toString();
          String cq = entry.getKey().getColumnQualifierData().toString();
          String val = entry.getValue().toString();

          if (cf.equals("tx") && cq.equals("seq")) {
            seq = Integer.parseInt(val);
          } else if (cf.equals("res")) {
            // EXCERCISE scanning the entire list to find if reserver is already in the list is
            // inefficient. One possible way to solve this would be to sort the data differently in
            // Accumulo so that finding the reserver could be done quickly.
            if (val.equals(who))
              if (maxReservation == -1)
                return ReservationResult.RESERVED; // already have the first reservation
              else
                return ReservationResult.WAIT_LISTED; // already on wait list

            // EXCERCISE the way this code finds the max reservation is very inefficient.... it
            // would be better if it did not have to scan the entire row. One possibility is to just
            // use the sequence number. Could also consider sorting the data in another way and/or
            // using an iterator.
            maxReservation = Integer.parseInt(cq);
          }
        }

        Condition condition = new Condition("tx", "seq");
        if (seq >= 0)
          condition.setValue(seq + ""); // only expect a seq # if one was seen

        update = new ConditionalMutation(row, condition);
        update.put("tx", "seq", (seq + 1) + "");
        update.put("res", String.format("%04d", maxReservation + 1), who);

        // EXCERCISE if set capacity is implemented, then result should take capacity into account
        if (maxReservation == -1)
          result = ReservationResult.RESERVED; // if successful, will be first reservation
        else
          result = ReservationResult.WAIT_LISTED;
      }
    }
  }

  public void cancel(String what, String when, String who) throws Exception {

    String row = what + ":" + when;

    // Even though this method is only deleting a column, its important to use a conditional writer.
    // By updating the seq # when deleting a reservation, it
    // will cause any concurrent reservations to retry. If this delete were done using a batch
    // writer, then a concurrent reservation could report WAIT_LISTED
    // when it actually got the reservation.

    // its important to use an isolated scanner so that only whole mutations are seen
    try (
        ConditionalWriter cwriter = client.createConditionalWriter(rTable,
            new ConditionalWriterConfig());
        Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
      while (true) {
        scanner.setRange(new Range(row));

        int seq = -1;
        String reservation = null;

        for (Entry<Key,Value> entry : scanner) {
          String cf = entry.getKey().getColumnFamilyData().toString();
          String cq = entry.getKey().getColumnQualifierData().toString();
          String val = entry.getValue().toString();

          // EXCERCISE avoid linear scan

          if (cf.equals("tx") && cq.equals("seq")) {
            seq = Integer.parseInt(val);
          } else if (cf.equals("res") && val.equals(who)) {
            reservation = cq;
          }
        }

        if (reservation != null) {
          ConditionalMutation update = new ConditionalMutation(row,
              new Condition("tx", "seq").setValue(seq + ""));
          update.putDelete("res", reservation);
          update.put("tx", "seq", (seq + 1) + "");

          Status status = cwriter.write(update).getStatus();
          switch (status) {
            case ACCEPTED:
              // successfully canceled reservation
              return;
            case REJECTED:
            case UNKNOWN:
              // retry
              // EXCERCISE exponential back-off could be used here
              break;
            default:
              throw new RuntimeException("Unexpected status " + status);
          }

        } else {
          // not reserved, nothing to do
          break;
        }

      }
    }
  }

  public List<String> list(String what, String when) throws Exception {
    String row = what + ":" + when;

    // its important to use an isolated scanner so that only whole mutations are seen
    try (
        Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
      scanner.setRange(new Range(row));
      scanner.fetchColumnFamily(new Text("res"));

      List<String> reservations = new ArrayList<>();

      for (Entry<Key,Value> entry : scanner) {
        String val = entry.getValue().toString();
        reservations.add(val);
      }

      return reservations;
    }
  }

  public static void main(String[] args) throws Exception {
    var console = System.console();
    var out = System.out;
    ARS ars = null;

    while (true) {
      String line = console.readLine(">");
      if (line == null)
        break;

      final String[] tokens = line.split("\\s+");

      if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
        // start up multiple threads all trying to reserve the same resource, no more than one
        // should succeed

        final ARS fars = ars;
        ArrayList<Thread> threads = new ArrayList<>();
        for (int i = 3; i < tokens.length; i++) {
          final int whoIndex = i;
          Runnable reservationTask = new Runnable() {
            @Override
            public void run() {
              try {
                out.println("  " + String.format("%20s", tokens[whoIndex]) + " : "
                    + fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
              } catch (Exception e) {
                log.warn("Could not write to the ConsoleReader.", e);
              }
            }
          };

          threads.add(new Thread(reservationTask));
        }

        for (Thread thread : threads)
          thread.start();

        for (Thread thread : threads)
          thread.join();

      } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) {
        ars.cancel(tokens[1], tokens[2], tokens[3]);
      } else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) {
        List<String> reservations = ars.list(tokens[1], tokens[2]);
        if (reservations.size() > 0) {
          out.println("  Reservation holder : " + reservations.get(0));
          if (reservations.size() > 1)
            out.println("  Wait list : " + reservations.subList(1, reservations.size()));
        }
      } else if (tokens[0].equals("quit") && tokens.length == 1) {
        break;
      } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) {

        // the client can't be closed here, because it is passed to the new ARS object
        AccumuloClient client = Accumulo.newClient().to(tokens[1], tokens[2])
            .as(tokens[3], tokens[4]).build();
        if (client.tableOperations().exists(tokens[5])) {
          ars = new ARS(client, tokens[5]);
          out.println("  connected");
        } else {
          out.println("  No Such Table");
        }
      } else {
        System.out.println("  Commands : ");
        if (ars == null) {
          out.println("    connect <instance> <zookeepers> <user> <pass> <table>");
        } else {
          out.println("    reserve <what> <when> <who> {who}");
          out.println("    cancel <what> <when> <who>");
          out.println("    list <what> <when>");
        }
      }
    }
  }
}
