Apache Fluo Recipes builds on the Apache Fluo API to provide libraries of common code for Fluo developers.
Below are resources for this release:
Download a release tarball and verify by these procedures using these KEYS
| fluo-recipes-1.1.0-incubating-source-release.tar.gz | ASC MD5 SHA |
View the documentation
Read the javadocs: core, accumulo, kryo, spark, test
Jars are available in Maven Central.
View the changes.
For this release of Fluo Recipes, the work done in #127, #128, #130, and #131 to support the new Observer API was the most significant change. The Collision Free Map and [Export Queue] required significant additions to support the new Observer API. Since the name Collision Free Map (CFM) is awful and it needed major API additions, the decision was made to deprecate it and offer the [CombineQueue]. The CombineQueue offers the same functionality as the CFM, but only supports the new observer API. The deprecated CFM still supports the old Observer API. For the Export Queue, additions were made to its API and everything related to the old Observer API was deprecated. All API changes in this release are backwards compatible with the 1.0.0 release.
The new APIs in this release are much easier to use and now offer the ability to use lambdas. This example attempts to shows this and does the following :
(x,y,t)
.(x,y)
, (x,t)
, and (y,t)
.To illustrate what this example accomplishes, for the following inputs :
2
events at (x=3,y=3,t=5)
1
events at (x=3,y=3,t=5)
4
events at (x=7,y=3,t=5)
The example code should compute the following.
3
events at (x=3,y=3,t=5)
4
events at (x=7,y=3,t=5)
3
events at (x=3,y=3)
4
events at (x=7,y=3)
3
events at (x=3,t=5)
4
events at (x=7,t=5)
7
events at (y=3,t=5)
The example achieves this using recipes as follows :
Below is the Fluo [ObserverProvider] that wires everything together. The new Fluo and Fluo Recipes APIs enable wiring everything in Java code. In the previous versions, this would have been a cumbersome combination of configuration and Java code. With the new APIs, using lambdas is now an option. This was not an option with the old APIs.
public class AppObserverProvider implements ObserverProvider { @Override public void provide(Registry obsRegistry, Context ctx) { SimpleConfiguration appCfg = ctx.getAppConfiguration(); CombineQueue<String, Long> xytCq = CombineQueue.getInstance(Example.CQ_XYT_ID, appCfg); CombineQueue<String, Long> xyCq = CombineQueue.getInstance(Example.CQ_XY_ID, appCfg); CombineQueue<String, Long> ytCq = CombineQueue.getInstance(Example.CQ_YT_ID, appCfg); CombineQueue<String, Long> xtCq = CombineQueue.getInstance(Example.CQ_XT_ID, appCfg); ExportQueue<String, String> exportQ = ExportQueue.getInstance(Example.EXPORTQ_ID, appCfg); // Some of Lambda's below could be inlined. To make the example a little more clear they were // not in order to show the types involved. // This is called by a combine queue when a value changes. The old and new value for the key // are passed. The lambda below queues changes for export. ChangeObserver<String, Long> expChangeObs = (tx, changes) -> { for (Change<String, Long> change : changes) { String oldVal = change.getOldValue().map(v -> "old: " + v).orElse("old: -"); String newVal = change.getNewValue().map(v -> "new: " + v).orElse("new: -"); exportQ.add(tx, change.getKey(), oldVal + " " + newVal); } }; // This lambda processes changes to 3D counts. It queues updates to the (x,y), (x,t), and (y,t) // 2D combine queues. For example if (x=3,y=2,t=5) changed from 4 to 7, it would queue // (x=3,y=2):+3, (x=3,t=5):+3, and (y=2,t=5):+3 to the 2D combine queues. The lambda also queues // exports for 3D count changes. ChangeObserver<String, Long> projectingChangeObs = (tx, changes) -> { Map<String, Long> xtUpdates = new HashMap<>(); Map<String, Long> ytUpdates = new HashMap<>(); Map<String, Long> xyUpdates = new HashMap<>(); for (Change<String, Long> change : changes) { String[] fields = change.getKey().split(":"); long delta = change.getNewValue().orElse(0L) - change.getOldValue().orElse(0L); // While processing the changes for an entire bucket, opportunistically merge multiple // updates to the same 2D coordinates. xtUpdates.merge(fields[0] + ":" + fields[2], delta, Long::sum); ytUpdates.merge(fields[1] + ":" + fields[2], delta, Long::sum); xyUpdates.merge(fields[0] + ":" + fields[1], delta, Long::sum); } // Queue updates to 2D combine queues. xtCq.addAll(tx, xtUpdates); ytCq.addAll(tx, ytUpdates); xyCq.addAll(tx, xyUpdates); // Queue changes for export expChangeObs.process(tx, changes); }; // Register observer for 3D combine queue. The observer calls the provided combiner and // change observer when processing queued entries. xytCq.registerObserver(obsRegistry, new SummingCombiner<>(), projectingChangeObs); // Register observers for all 2D combine queues. xyCq.registerObserver(obsRegistry, new SummingCombiner<>(), expChangeObs); xtCq.registerObserver(obsRegistry, new SummingCombiner<>(), expChangeObs); ytCq.registerObserver(obsRegistry, new SummingCombiner<>(), expChangeObs); // This functional interface is new in this release. The lambda below prints out data that was // successfully queued for export. Exporter<String, String> exporter = iter -> { while (iter.hasNext()) { SequencedExport<String, String> seqExport = iter.next(); System.out.printf("EXPORT %-15s %-15s seq: %d\n", seqExport.getKey(), seqExport.getValue(), seqExport.getSequence()); } }; // Register an observer to process queued export entries. The observer will call the lambda // created above. exportQ.registerObserver(obsRegistry, exporter); } }
The code below does three things :
FluoConfiguration props = new FluoConfiguration(); props.setApplicationName("dimensions"); props.setMiniDataDir("target/mini"); CombineQueue.configure(CQ_XYT_ID).keyType(String.class).valueType(Long.class).buckets(7).save(props); CombineQueue.configure(CQ_XT_ID).keyType(String.class).valueType(Long.class).buckets(7).save(props); CombineQueue.configure(CQ_XY_ID).keyType(String.class).valueType(Long.class).buckets(7).save(props); CombineQueue.configure(CQ_YT_ID).keyType(String.class).valueType(Long.class).buckets(7).save(props); // A new Fluent method of configuring export queues was introduced in 1.1.0 ExportQueue.configure(EXPORTQ_ID).keyType(String.class).valueType(String.class).buckets(7).save(props); props.setObserverProvider(AppObserverProvider.class); FileUtils.deleteQuietly(new File("target/mini")); try (MiniFluo miniFluo = FluoFactory.newMiniFluo(props); FluoClient fc = FluoFactory.newClient(miniFluo.getClientConfiguration())) { CombineQueue<String,Long> xytCq = CombineQueue.getInstance(CQ_XYT_ID, fc.getAppConfiguration()); Map<String,Long> updates = new HashMap<>(); updates.put("x=3:y=5:t=23", 1L); updates.put("x=5:y=5:t=27", 1L); updates.put("x=3:y=5:t=27", 1L); try (Transaction tx = fc.newTransaction()) { xytCq.addAll(tx, updates); tx.commit(); } miniFluo.waitForObservers(); System.out.println("\n*** All notifications processed. ***\n"); updates.clear(); updates.put("x=3:y=5:t=23", 1L); updates.put("x=5:y=5:t=27", -1L); updates.put("x=3:y=5:t=29", 1L); try (Transaction tx = fc.newTransaction()) { xytCq.addAll(tx, updates); tx.commit(); } miniFluo.waitForObservers(); System.out.println("\n*** All notifications processed. ***\n"); }
Below is the output of running this example.
EXPORT x=3:y=5:t=23 old: - new: 1 seq: 8 EXPORT x=3:y=5:t=27 old: - new: 1 seq: 9 EXPORT x=5:y=5:t=27 old: - new: 1 seq: 9 EXPORT x=3:y=5 old: - new: 2 seq: 37 EXPORT y=5:t=27 old: - new: 2 seq: 42 EXPORT x=3:t=23 old: - new: 1 seq: 36 EXPORT x=5:t=27 old: - new: 1 seq: 36 EXPORT x=3:t=27 old: - new: 1 seq: 38 EXPORT x=5:y=5 old: - new: 1 seq: 39 EXPORT y=5:t=23 old: - new: 1 seq: 41 *** All notifications processed. *** EXPORT x=3:y=5:t=29 old: - new: 1 seq: 92 EXPORT x=5:y=5:t=27 old: 1 new: - seq: 92 EXPORT x=3:y=5:t=23 old: 1 new: 2 seq: 93 EXPORT y=5:t=27 old: 2 new: 1 seq: 109 EXPORT x=3:y=5 old: 2 new: 4 seq: 110 EXPORT y=5:t=23 old: 1 new: 2 seq: 111 EXPORT y=5:t=29 old: - new: 1 seq: 108 EXPORT x=3:t=29 old: - new: 1 seq: 105 EXPORT x=3:t=23 old: 1 new: 2 seq: 106 EXPORT x=5:y=5 old: 1 new: - seq: 107 EXPORT x=5:t=27 old: 1 new: - seq: 106 *** All notifications processed. ***
[ObserverProvider]: {{ site.fluo_api_static }}/1.1.0-incubating/org/apache/fluo/api/observer/ObserverProvider.html [CombineQueue]: {{ site.fluo_recipes_core_static }}/1.1.0-incubating/org/apache/fluo/recipes/core/combine/CombineQueue.html [Export Queue]: {{ site.fluo_recipes_core_static }}/1.1.0-incubating/org/apache/fluo/recipes/core/export/ExportQueue.html