blob: 1836780e5838b324b67ebe922f3557ae3335503f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.reservations;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic
* reservations of a resource at a date. Wait list are also supported. In order to keep the example
* simple, no checking is done of the date. Also the code is inefficient, if interested in improving
* it take a look at the EXCERCISE comments.
*/
// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads
// making reservations against N resources. Each thread could randomly reserve and cancel resources
// for a single user. When each thread finishes, it knows what the state of its single user should
// be. When all threads finish, collect their expected state and verify the status of all users and
// resources. For extra credit run the test on a IAAS provider using 10 nodes and 10 threads per
// node.
public class ARS {
private static final Logger log = LoggerFactory.getLogger(ARS.class);
private AccumuloClient client;
private String rTable;
public enum ReservationResult {
RESERVED, WAIT_LISTED
}
public ARS(AccumuloClient client, String rTable) {
this.client = client;
this.rTable = rTable;
}
public List<String> setCapacity(String what, String when, int count) {
// EXCERCISE implement this method which atomically sets a capacity and returns anyone who was
// moved to the wait list if the capacity was decreased
throw new UnsupportedOperationException();
}
public ReservationResult reserve(String what, String when, String who) throws Exception {
String row = what + ":" + when;
// EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation
// exist then the update will fail. This is a good strategy when it is expected there are
// usually no reservations. Could modify the code to scan first.
// The following mutation requires that the column tx:seq does not exist and will fail if it
// does.
ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq"));
update.put("tx", "seq", "0");
update.put("res", String.format("%04d", 0), who);
ReservationResult result = ReservationResult.RESERVED;
// it is important to use an isolated scanner so that only whole mutations are seen
try (
ConditionalWriter cwriter = client.createConditionalWriter(rTable,
new ConditionalWriterConfig());
Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
Status status = cwriter.write(update).getStatus();
switch (status) {
case ACCEPTED:
return result;
case REJECTED:
case UNKNOWN:
// read the row and decide what to do
break;
default:
throw new RuntimeException("Unexpected status " + status);
}
// EXCERCISE in the case of many threads trying to reserve a slot, this approach of
// immediately retrying is inefficient. Exponential back-off is good general solution to
// solve contention problems like this. However in this particular case, exponential
// back-off could penalize the earliest threads that attempted to make a reservation by
// putting them later in the list. A more complex solution could involve having independent
// sub-queues within the row that approximately maintain arrival order and use exponential
// back off to fairly merge the sub-queues into the main queue.
scanner.setRange(new Range(row));
int seq = -1;
int maxReservation = -1;
for (Entry<Key,Value> entry : scanner) {
String cf = entry.getKey().getColumnFamilyData().toString();
String cq = entry.getKey().getColumnQualifierData().toString();
String val = entry.getValue().toString();
if (cf.equals("tx") && cq.equals("seq")) {
seq = Integer.parseInt(val);
} else if (cf.equals("res")) {
// EXCERCISE scanning the entire list to find if reserver is already in the list is
// inefficient. One possible way to solve this would be to sort the data differently in
// Accumulo so that finding the reserver could be done quickly.
if (val.equals(who))
if (maxReservation == -1)
return ReservationResult.RESERVED; // already have the first reservation
else
return ReservationResult.WAIT_LISTED; // already on wait list
// EXCERCISE the way this code finds the max reservation is very inefficient.... it
// would be better if it did not have to scan the entire row. One possibility is to just
// use the sequence number. Could also consider sorting the data in another way and/or
// using an iterator.
maxReservation = Integer.parseInt(cq);
}
}
Condition condition = new Condition("tx", "seq");
if (seq >= 0)
condition.setValue(seq + ""); // only expect a seq # if one was seen
update = new ConditionalMutation(row, condition);
update.put("tx", "seq", (seq + 1) + "");
update.put("res", String.format("%04d", maxReservation + 1), who);
// EXCERCISE if set capacity is implemented, then result should take capacity into account
if (maxReservation == -1)
result = ReservationResult.RESERVED; // if successful, will be first reservation
else
result = ReservationResult.WAIT_LISTED;
}
}
}
public void cancel(String what, String when, String who) throws Exception {
String row = what + ":" + when;
// Even though this method is only deleting a column, its important to use a conditional writer.
// By updating the seq # when deleting a reservation, it
// will cause any concurrent reservations to retry. If this delete were done using a batch
// writer, then a concurrent reservation could report WAIT_LISTED
// when it actually got the reservation.
// its important to use an isolated scanner so that only whole mutations are seen
try (
ConditionalWriter cwriter = client.createConditionalWriter(rTable,
new ConditionalWriterConfig());
Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
while (true) {
scanner.setRange(new Range(row));
int seq = -1;
String reservation = null;
for (Entry<Key,Value> entry : scanner) {
String cf = entry.getKey().getColumnFamilyData().toString();
String cq = entry.getKey().getColumnQualifierData().toString();
String val = entry.getValue().toString();
// EXCERCISE avoid linear scan
if (cf.equals("tx") && cq.equals("seq")) {
seq = Integer.parseInt(val);
} else if (cf.equals("res") && val.equals(who)) {
reservation = cq;
}
}
if (reservation != null) {
ConditionalMutation update = new ConditionalMutation(row,
new Condition("tx", "seq").setValue(seq + ""));
update.putDelete("res", reservation);
update.put("tx", "seq", (seq + 1) + "");
Status status = cwriter.write(update).getStatus();
switch (status) {
case ACCEPTED:
// successfully canceled reservation
return;
case REJECTED:
case UNKNOWN:
// retry
// EXCERCISE exponential back-off could be used here
break;
default:
throw new RuntimeException("Unexpected status " + status);
}
} else {
// not reserved, nothing to do
break;
}
}
}
}
public List<String> list(String what, String when) throws Exception {
String row = what + ":" + when;
// its important to use an isolated scanner so that only whole mutations are seen
try (
Scanner scanner = new IsolatedScanner(client.createScanner(rTable, Authorizations.EMPTY))) {
scanner.setRange(new Range(row));
scanner.fetchColumnFamily("res");
List<String> reservations = new ArrayList<>();
for (Entry<Key,Value> entry : scanner) {
String val = entry.getValue().toString();
reservations.add(val);
}
return reservations;
}
}
public static void main(String[] args) throws Exception {
var console = System.console();
var out = System.out;
ARS ars = null;
while (true) {
String line = console.readLine(">");
if (line == null)
break;
final String[] tokens = line.split("\\s+");
if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
// start up multiple threads all trying to reserve the same resource, no more than one
// should succeed
final ARS fars = ars;
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 3; i < tokens.length; i++) {
final int whoIndex = i;
Runnable reservationTask = new Runnable() {
@Override
public void run() {
try {
out.println(" " + String.format("%20s", tokens[whoIndex]) + " : "
+ fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
} catch (Exception e) {
log.warn("Could not write to the ConsoleReader.", e);
}
}
};
threads.add(new Thread(reservationTask));
}
for (Thread thread : threads)
thread.start();
for (Thread thread : threads)
thread.join();
} else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) {
ars.cancel(tokens[1], tokens[2], tokens[3]);
} else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) {
List<String> reservations = ars.list(tokens[1], tokens[2]);
if (reservations.size() > 0) {
out.println(" Reservation holder : " + reservations.get(0));
if (reservations.size() > 1)
out.println(" Wait list : " + reservations.subList(1, reservations.size()));
}
} else if (tokens[0].equals("quit") && tokens.length == 1) {
break;
} else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) {
// the client can't be closed here, because it is passed to the new ARS object
AccumuloClient client = Accumulo.newClient().to(tokens[1], tokens[2])
.as(tokens[3], tokens[4]).build();
if (client.tableOperations().exists(tokens[5])) {
ars = new ARS(client, tokens[5]);
out.println(" connected");
} else {
out.println(" No Such Table");
}
} else {
System.out.println(" Commands : ");
if (ars == null) {
out.println(" connect <instance> <zookeepers> <user> <pass> <table>");
} else {
out.println(" reserve <what> <when> <who> {who}");
out.println(" cancel <what> <when> <who>");
out.println(" list <what> <when>");
}
}
}
}
}