In this tutorial, we will add a new data source consisting of a single data stream. The source will be provided as a standalone component (i.e., the description will be accessible through an integrated web server).
We are going to create a new data stream that is produced by a GPS sensor installed in a delivery vehicle. The sensor produces a continuous stream of events that contain the current timestamp, the current lat/lng position of the vehicle and the plate number of the vehicle. Events are published in a JSON format as follows:
{ "timestamp" : 145838399, "latitude" : 37.04, "longitude" : 17.04, "plateNumber" : "KA-AB 123" }
These events are published to a Kafka broker using the topic org.streampipes.tutorial.vehicle
.
In the following section, we show how to describe this stream in a form that allows you to import and use it in StreamPipes.
Instead of creating a new project from scratch, we recommend to use the Maven archetype to create a new project skeleton (streampipes-archetype-extensions-jvm). Enter the following command in a command line of your choice (Apache Maven needs to be installed):
mvn archetype:generate \ -DarchetypeGroupId=org.apache.streampipes -DarchetypeArtifactId=streampipes-archetype-extensions-jvm \ -DarchetypeVersion=0.70.0 -DgroupId=my.groupId \ -DartifactId=my-source -DclassNamePrefix=MySource -DpackageName=mypackagename
You will see a project structure similar to the structure shown in the archetypes section.
:::tip
Besides the basic project skeleton, the sample project also includes an example Dockerfile you can use to package your application into a Docker container.
:::
Now we will add a new data stream definition. First, create a new class MyVehicleStream
which should look as follows:
package org.apache.streampipes.pe.example; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.sources.AbstractAdapterIncludedStream; public class MyVehicleStream extends AbstractAdapterIncludedStream { @Override public SpDataStream declareModel() { return null; } @Override public void executeStream() { } }
This class extends the class AbstractAdapterIncludedStream
, which indicates that this source continuously produces data (configured in the executeStream()
method. In contrast, the class AbstractAlreadyExistingStream
indicates that we only want to describe an already existing stream (e.g., a stream that already sends data to an existing Kafka broker).
Next, we will add the definition of the data stream. Add the following code inside of the declareModel
method:
return DataStreamBuilder.create("org.apache.streampipes.tutorial.vehicle.position", "Vehicle Position", "An event stream " + "that produces current vehicle positions")
This line creates a new instance of the SDK's DataStreamBuilder
by providing three basic parameters: The first parameter must be a unique identifier of your data stream. The second and third parameters indicate a label and a description of your stream. These values will later be used in the StreamPipes UI to display stream details in a human-readable manner.
Next, we will add the properties as stated above to the stream definition by adding the following lines:
.property(EpProperties.timestampProperty("timestamp")) .property(EpProperties.stringEp(Labels.from("plate-number", "Plate Number", "Denotes the plate number of the vehicle"), "plateNumber", "http://my.company/plateNumber")) .property(EpProperties.doubleEp(Labels.from("latitude", "Latitude", "Denotes the latitude value of the vehicle's position"), "latitude", Geo.lat)) .property(EpProperties.doubleEp(Labels.from("longitude", "Longitude", "Denotes the longitude value of the vehicle's position"), "longitude", Geo.lng))
These four event properties compose our event schema. An event property must, at least, provide the following attributes:
{"plateNumber" : "KA-F 123"}
, the runtime name must be plateNumber
.XMLSchema
primitives, however, the SDK provides convenience methods to provide the property type.latitude
property is linked to the http://www.w3.org/2003/01/geo/wgs84_pos#lat
property of the WGS84 vocabulary. The domain property should be an URI as part of an existing or domain-specific vocabulary. The SDK provides convenience methods for popuplar vocabularies (e.g., Schema.org, Dolce or WGS84).In order to complete the minimum required specification of an event stream, we need to provide information on the transport format and protocol of the data stream at runtime.
This can be achieved by extending the builder with the respective properties:
.format(Formats.jsonFormat()) .protocol(Protocols.kafka("localhost", 9094, "org.apache.streampipes.tutoria.vehicle")) .build();
In this example, we defined that the data stream consists of events in a JSON format and that Kafka is used as a message broker to transmit events. The last build() method call triggers the construction of the data stream definition.
That's it! In the next section, we will connect the data stream to a source and inspect the generated RDF description.
Let‘s assume our stream should produce some random values that are sent to StreamPipes. We’ll add a very simple data simulator to the executeStream
method as follows:
@Override public void executeStream() { SpKafkaProducer producer = new SpKafkaProducer("localhost:9094", "my-topic", Collections.emptyList()); Random random = new Random(); Runnable runnable = () -> { for (;;) { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("timestamp", System.currentTimeMillis()); jsonObject.addProperty("plateNumber", "KA-SP 1"); jsonObject.addProperty("latitude", random.nextDouble()); jsonObject.addProperty("longitude", random.nextDouble()); producer.publish(jsonObject.toString()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }; new Thread(runnable).start(); }
Change the topic and the URL of your Kafka broker (as stated in the controller).
You need to register the stream in the service definition. Open the Init
class and register the MyVehicleStream
:
@Override public SpServiceDefinition provideServiceDefinition() { return SpServiceDefinitionBuilder.create("org.apache.streampipes", "human-readable service name", "human-readable service description", 8090) .registerPipelineElement(new ExampleDataProcessor()) .registerPipelineElement(new ExampleDataSink()) .registerPipelineElement(new MyVehicleStream()) .registerMessagingFormats( new JsonDataFormatFactory(), new CborDataFormatFactory(), new SmileDataFormatFactory(), new FstDataFormatFactory()) .registerMessagingProtocols( new SpKafkaProtocolFactory(), new SpJmsProtocolFactory(), new SpMqttProtocolFactory()) .build(); }
You can remove the other two example classes if you want.
:::tip
Once you start the service, it will register in StreamPipes with the hostname. The hostname will be auto-discovered and should work out-of-the-box. In some cases, the detected hostname is not resolvable from within a container (where the core is running). In this case, provide a SP_HOST environment variable to override the auto-discovery.
:::
Now we are ready to start our first container!
Configure your IDE to provide an environment variable called SP_DEBUG
with value true
when starting the project.
The service automatically registers itself in StreamPipes.
To install the just created element, open the StreamPipes UI and install the source over the Install Pipeline Elements
section.
Congratulations! You've just created your first pipeline element for StreamPipes. There are many more things to explore and data sources can be defined in much more detail. Also consider that in some cases, you would like to create a configurable adapter, where a data source can be configured by users in the UI when connecting data.