This recipe addresses the question: How can I dynamically enable or disable entire portions of my application's analytics?
Imagine a topology that has a variety of analytics that it can perform. Each analytic flow comes with certain costs in terms of demands on the CPU or memory and implications for power consumption. Hence an application may wish to dynamically control whether or not an analytic flow is currently enabled.
A edgent.topology.plumbing.Valve
is a simple construct that can be inserted in stream flows to dynamically enable or disable downstream processing. A valve is either open or closed. When used as a Predicate
to TStream.filter()
, filter
passes tuples only when the valve is open. Hence downstream processing is enabled when the valve is open and effectively disabled when the valve is closed.
For example, consider a a topology consisting of 3 analytic processing flows that want to be dynamically enabled or disabled:
Valve<Readings> flow1Valve = new Valve<>(); // default is open Valve<Readings> flow2Valve = new Valve<>(false); // closed Valve<Readings> flow3Valve = new Valve<>(false); TStream<Readings> readings = topology.poll(mySensor, 1, TimeUnit.SECONDS); addAnalyticFlow1(readings.filter(flow1Valve)); addAnalyticFlow2(readings.filter(flow2Valve)); addAnalyticFlow3(readings.filter(flow3Valve));
Elsewhere in the application, perhaps as a result of processing some device command from an external service such as when using an IotProvider
or IotDevice
, valves may be opened and closed dynamically to achieve the desired effects. For example:
TStream<JsonObject> cmds = simulatedValveCommands(topology); cmds.sink(json -> { String valveId = json.getPrimitive("valve").getAsString(); boolean isOpen = json.getPrimitive("isOpen").getAsBoolean(); switch(valveId) { case "flow1": flow1Valve.setOpen(isOpen); break; case "flow2": flow2Valve.setOpen(isOpen); break; case "flow3": flow3Valve.setOpen(isOpen); break; } });
Another approach for achieving dynamic control over what analytics flows are running is to utilize loosely coupled applications.
In this approach, the overall application is partitioned into multiple applications (topologies). In the above example there could be four applications: one that publishes the sensor readings
stream, and one for each of the analytic flows.
The separate applications can connect to each other's streams using the edgent.connectors.pubsub.PublishSubscribe
connector.
Rather than having all of the analytic applications running all of the time, applications can be registered with a edgent.topology.services.ApplicationService
. Registered applications can then be started and stopped dynamically.
The edgent.providers.iot.IotProvider
is designed to facilitate this style of use.