static Column NC = new Column("ntyf", "sum"); static Column TOTAL_COL = new Column("sum", "total"); static Column UPDATE_COL = new Column("sum", "update"); static Column CONTINUE_COL = new Column("sum", "continue"); public static class SummingObserver implements StringObserver { private int maxToProcess; SummingObserver(int maxToProcess) { this.maxToProcess = maxToProcess; } @Override public void process(TransactionBase tx, String row, Column col) throws Exception { Map<Column, String> colVals = tx.gets(row, TOTAL_COL, CONTINUE_COL); int sum = Integer.parseInt(colVals.getOrDefault(TOTAL_COL, "0")); // construct a scan range that uses the continue row String startRow = colVals.getOrDefault(CONTINUE_COL, row + "/"); String endRow = row + "/:"; // after the character '9' comes ':' CellScanner scanner = tx.scanner().over(new Span(startRow, true, endRow, false)).build(); int processed = 0; for (RowColumnValue rcv : scanner) { if (processed >= maxToProcess) { // stop processing and set the continue row tx.set(row, CONTINUE_COL, rcv.getsRow()); tx.setWeakNotification(row, col); break; } sum += Integer.parseInt(rcv.getsValue()); tx.delete(rcv.getRow(), rcv.getColumn()); processed++; } System.out.println("sum : " + sum + " start: " + startRow + " processed: " + processed); tx.set(row, TOTAL_COL, "" + sum); // if did not set the continue column and it exists, then delete it if (processed < maxToProcess && colVals.containsKey(CONTINUE_COL)) { tx.delete(row, CONTINUE_COL); // need to start over at the beginning and see if there is new data before the continue // column tx.setWeakNotification(row, col); } } } public static class TourObserverProvider implements ObserverProvider { @Override public void provide(Registry obsRegistry, Context ctx) { int maxToProcess = ctx.getAppConfiguration().getInt("maxToProcess"); obsRegistry.forColumn(NC, NotificationType.WEAK) .useObserver(new SummingObserver(maxToProcess)); } } private static void preInit(FluoConfiguration fluoConfig) { fluoConfig.getAppConfiguration().setProperty("maxToProcess", 500); fluoConfig.setObserverProvider(TourObserverProvider.class); } private static void exercise(MiniFluo mini, FluoClient client) { try (LoaderExecutor le = client.newLoaderExecutor()) { Random r = new Random(42); for (int i = 0; i < 5000; i++) { // The Loader interface only has one function and can therefore be written as a lambda // below. le.execute((tx, ctx) -> { String row = "counter001/" + String.format("%07d", r.nextInt(10000000)); int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0")); tx.set(row, UPDATE_COL, curVal + 1 + ""); tx.setWeakNotification("counter001", NC); }); } } mini.waitForObservers(); try (Snapshot snap = client.newSnapshot()) { System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL)); } }
The code above will print something like the following.
$ mvn -q clean compile exec:java Starting MiniFluo ... started. sum : 500 start: counter001/ processed: 500 sum : 891 start: counter001/7945963 processed: 390 sum : 1391 start: counter001/ processed: 500 sum : 1891 start: counter001/2938489 processed: 500 sum : 2391 start: counter001/5210523 processed: 500 sum : 2892 start: counter001/6912090 processed: 500 sum : 3392 start: counter001/8410312 processed: 500 sum : 3398 start: counter001/9991522 processed: 6 sum : 3898 start: counter001/ processed: 500 sum : 4398 start: counter001/1824962 processed: 500 sum : 4899 start: counter001/4076664 processed: 500 sum : 5000 start: counter001/6993690 processed: 101 sum : 5000 start: counter001/ processed: 0 final sum : 5000