| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| <html lang="en"> |
| <head> |
| <title>Edgent Source Streams</title> |
| </head> |
| <body> |
| <div role="main" aria-label="Source streams"> |
| <H1>Edgent Source Streams</H1> |
| <h2>Introduction</h2> |
| One of the first things you probably want to do is to bring data |
| into your Edgent applications. |
| The task is to create a <em>source stream</em> that contains the data |
| that is to be processed and analyzed by Edgent. |
| <br> |
| Edgent provides a number of connectors providing source streams, such as IoTF, MQTT, Kafka, Files, HTTP etc, |
| however for a specific device or application you may need to develop your own source streams. |
| <h2>Source Streams</h2> |
| To simplify the discussion we will describe these in terms of reading sensors, |
| though each approach can apply to non-sensor data, such as polling an HTTP server for information. |
| <P> |
| Thus the goal is to produce a <em>source stream</em> where each tuple on the stream represents |
| a reading from a sensor (or multiple sensors if the tuple object contains a sensor identifier). |
| </P> |
| <P> |
| Source streams are created using functions and there are three styles of bringing data into Edgent. |
| <OL> |
| <LI> <a href="#polling">Polling</a> - periodically polling of a sensor's value. </LI> |
| <LI> <a href="#blocking">Blocking</a> - a request is made to fetch data from a sensor </LI> |
| <LI> <a href="#events">Events</a> - an event is created (by a non-Edgent framework) when the sensor changes </LI> |
| </OL> |
| <h3 id="polling">Polling Sources</h3> |
| To poll a sensor periodically an application provides a <b>non-blocking</b> |
| <a href="../../function/Supplier.html">Supplier function</a> that reads the sensor's value |
| and passes it to <a href="../Topology.html#poll-org.apache.edgent.function.Supplier-long-java.util.concurrent.TimeUnit-">Topology.poll()</a>. |
| <P> |
| For example imagine a library provides a static method to read an engine's oil temperature. |
| <pre> |
| <code> |
| double oilTemp = EngineInfo.getOilTemperature(); |
| </code> |
| </pre> |
| <br> |
| To declare a stream in a topology that will result in the sensor being read every 100ms: |
| an application uses a lambda expression as the function and passes it to <code>poll()</code>: |
| <pre> |
| <code> |
| TStream<Double> oilTemps = topology.poll(() -> EngineInfo.getOilTemperature(), 100, TimeUnit.MILLISECONDS); |
| </code> |
| </pre> |
| At runtime this results in a stream containing a new tuple approximately every 100ms |
| set to the current value of the oil temperature (as a Double object). |
| <BR> |
| A JSON source stream can be created that contains the sensor identifier in addition to the sensor reading. |
| This allows multiple sensors to be present on the same stream (for example by merging a stream containing |
| oil temperatures every 100ms with one containing coolant temperature polled every ten seconds), for partitioned |
| downstream processing. |
| <br> |
| Here's an example of representing the same oil temperature as a JSON object: |
| <pre> |
| <code> |
| TStream<JsonObject> oilTemps = topology.poll(() -> |
| { |
| JsonObject j = new JsonObject(); |
| j.addProperty("id", "oilTemp"); |
| j.addProperty("value", EngineInfo.getOilTemperature()); |
| return j; |
| }, 100, TimeUnit.MILLISECONDS); |
| </code> |
| </pre> |
| <h3 id="blocking">Blocking Sources</h3> |
| Some sensors may be of the blocking style, a method is called that blocks until a new reading is available. |
| Similar to the polling style an application provides a |
| <a href="../../function/Supplier.html">Supplier function</a> that reads the sensor's value. The difference |
| here is that the method can block waiting for a reading to become available. When a reading is available the method |
| returns and the value is put on the returned stream. |
| <P> |
| The function is passed to |
| <a href="../Topology.html#generate-org.apache.edgent.function.Supplier-">Topology.generate()</a> |
| which at runtime will result in a dedicated thread that loops calling <code>get()</code>. |
| Thus every time the function has a reading available it returns it which places the returned value |
| on the stream and then is called again to wait for the next available reading. |
| <br> |
| With this example the function is similar to the polling example, the only difference is the |
| call to get the sensor reading blocks : |
| <pre> |
| <code> |
| TStream<JsonObject> drillDepths = topology.generate(() -> |
| { |
| JsonObject j = new JsonObject(); |
| j.addProperty("id", "drillDepth"); |
| // blocks until the drill has completed a |
| // requested move and returns its current depth. |
| j.addProperty("value", Drill.getDepth()); |
| return j; |
| }); |
| </code> |
| </pre> |
| </P> |
| <P> |
| <a href="../Topology.html#source-org.apache.edgent.function.Supplier-">Topology.source()</a> is another method |
| that supports a blocking source. This time the <code>Supplier</code> function passed in is called once |
| to fetch the object that will be iterated over. The iterable object may be finite or infinite. In either |
| case the <code>next()</code> method may block waiting to obtain the next value to place on the stream. |
| </P> |
| <h3 id="events">Event Sources</h3> |
| <P> |
| This is a typical style for sensors where a framework exists such that an application can register interest |
| in receiving events or notifications when a sensor changes its value or state. This is typically performed |
| by the application registering a callback or listener object that is called by the framework when the sensor changes. |
| </P> |
| <a href="../Topology.html#events-org.apache.edgent.function.Consumer-">Topology.events()</a> is the method to |
| create a source stream from events using a listener function. The Edgent application passes |
| a <a href="../../function/Consumer.html">Consumer function</a> that will be called when the |
| application starts. |
| <BR> |
| The application's consumer function is passed a reference to a different <code>eventSubmitter</code> |
| function provided by Edgent, this function (also a <code>Consumer</code>) is used by the callback |
| to place tuples onto the stream (it <em>submits the event to the stream</em>). |
| <BR> |
| Thus the application's function has to: |
| <UL> |
| <LI>Register a callback with the sensor framework that will be called when the sensor changes. </LI> |
| <LI>Have the callback convert the sensor change event information into the tuple type it wants to send on the stream, |
| for example a JSON object. In some cases the raw event object is used as the stream's tuple. </LI> |
| <LI>Execute the function provided by Edgent calling <code>eventSubmitter.accept(tuple)</code> to place the |
| tuple onto the stream. |
| </UL> |
| <P> |
| Here's an example of creating a stream of sensor events in Android. |
| <BR> |
| <pre> |
| <code> |
| SensorManager mSensorManager = (SensorManager) getSystemService(Context.SENSOR_SERVICE); |
| |
| // Passes a Consumer that registers a SensorChangeEvents that |
| // puts the SensorEvent onto the stream using eventSubmitter |
| // supplied by Edgent. |
| |
| // Note for clarity the application function is implemented as a lambda expression |
| |
| TStream<SensorEventt> tempSensor = topology.events(eventSubmitter -> |
| mSensorManager.registerListener( |
| new SensorChangeEvents(eventSubmitter), |
| Sensor.TYPE_AMBIENT_TEMPERATURE, |
| SensorManager.SENSOR_DELAY_NORMAL); |
| |
| // .... |
| |
| /** |
| * Sensor event listener that submits sensor |
| * change events as tuples using a Consumer. |
| */ |
| public class SensorChangeEvents implements SensorEventListener { |
| private final Consumer<SensorEvent> eventSubmitter; |
| |
| public SensorChangeEvents(Consumer<SensorEvent> eventSubmitter) { |
| this.eventSubmitter = eventSubmitter; |
| } |
| @Override |
| public void onSensorChanged(SensorEvent event) { |
| // Submit the event directly to the stream |
| eventSubmitter.accept(event); |
| } |
| |
| @Override |
| public void onAccuracyChanged(Sensor sensor, int accuracy) { |
| } |
| } |
| |
| </code> |
| </pre> |
| </P> |
| </div> |
| </body> |
| </html> |