In this tutorial, we will add a new data source consisting of a single data stream. The source will be provided as a standalone component (i.e., the description will be accessible through an integrated web server).
We are going to create a new data stream that is produced by a GPS sensor installed in a delivery vehicle. The sensor produces a continuous stream of events that contain the current timestamp, the current lat/lng position of the vehicle and the plate number of the vehicle. Events are published in a JSON format as follows:
{ "timestamp" : 145838399, "latitude" : 37.04, "longitude" : 17.04, "plateNumber" : "KA-AB 123" }
These events are published to a Kafka broker using the topic org.streampipes.tutorial.vehicle
.
In the following section, we show how to describe this stream in a form that allows you to import and use it in StreamPipes.
Instead of creating a new project from scratch, we recommend to use the Maven archetype to create a new project skeleton. Enter the following command in a command line of your choice (Apache Maven needs to be installed):
mvn archetype:generate \ -DarchetypeGroupId=org.apache.streampipes -DarchetypeArtifactId=streampipes-archetype-pe-sources \ -DarchetypeVersion=0.68.0 -DgroupId=my.groupId \ -DartifactId=my-source -DclassNamePrefix=MySource -DpackageName=mypackagename
Configure the variables artifactId
(which will be the Maven artifactId), classNamePrefix
(which will be the class name of your data stream) and packageName
.
For this tutorial, use Vehicle
as classNamePrefix
.
Your project will look as follows:
That's it, go to the next section to learn how to create your first data stream!
Now we will add a new data stream definition. First, open the class VehicleStream
which should look as follows:
package my.groupId.pe.mypackagename; import org.streampipes.model.SpDataStream; import org.streampipes.model.graph.DataSourceDescription; import org.streampipes.sdk.builder.DataStreamBuilder; import org.streampipes.sdk.helpers.EpProperties; import org.streampipes.sdk.helpers.Formats; import org.streampipes.sdk.helpers.Protocols; import org.streampipes.sources.AbstractAdapterIncludedStream; public class MySourceStream extends AbstractAdapterIncludedStream { @Override public SpDataStream declareModel(DataSourceDescription sep) { return DataStreamBuilder.create("my.groupId-mypackagename", "MySource", "") .property(EpProperties.timestampProperty("timestamp")) // configure your stream here .format(Formats.jsonFormat()) .protocol(Protocols.kafka("localhost", 9092, "TOPIC_SHOULD_BE_CHANGED")) .build(); } @Override public void executeStream() { } }
This class extends the class AbstractAdapterIncludedStream
, which indicates that this source continuously produces data (configured in the executeStream()
method. In contrast, the class AbstractAlreadyExistingStream
indicates that we only want to describe an already existing stream (e.g., a stream that already sends data to an existing Kafka broker).
Next, we will add the definition of the data stream. Add the following code inside of the declareModel
method:
return DataStreamBuilder.create("org.streampipes.tutorial.vehicle.position", "Vehicle Position", "An event stream " + "that produces current vehicle positions")
This line creates a new instance of the SDK's DataStreamBuilder
by providing three basic parameters: The first parameter must be a unique identifier of your data stream. The second and third parameters indicate a label and a description of your stream. These values will later be used in the StreamPipes UI to display stream details in a human-readable manner.
Next, we will add the properties as stated above to the stream definition by adding the following lines:
.property(EpProperties.timestampProperty("timestamp")) .property(EpProperties.stringEp(Labels.from("plate-number", "Plate Number", "Denotes the plate number of the vehicle"), "plateNumber", "http://my.company/plateNumber")) .property(EpProperties.doubleEp(Labels.from("latitude", "Latitude", "Denotes the latitude value of the vehicle's position"), "latitude", Geo.lat)) .property(EpProperties.doubleEp(Labels.from("longitude", "Longitude", "Denotes the longitude value of the vehicle's position"), "longitude", Geo.lng))
These four event properties compose our event schema. An event property must, at least, provide the following attributes:
{"plateNumber" : "KA-F 123"}
, the runtime name must be plateNumber
.XMLSchema
primitives, however, the SDK provides convenience methods to provide the property type.latitude
property is linked to the http://www.w3.org/2003/01/geo/wgs84_pos#lat
property of the WGS84 vocabulary. The domain property should be an URI as part of an existing or domain-specific vocabulary. The SDK provides convenience methods for popuplar vocabularies (e.g., Schema.org, Dolce or WGS84).In order to complete the minimum required specification of an event stream, we need to provide information on the transport format and protocol of the data stream at runtime.
This can be achieved by extending the builder with the respective properties (which should already have been auto-generated):
.format(Formats.jsonFormat()) .protocol(Protocols.kafka("localhost", 9092, "TOPIC_SHOULD_BE_CHANGED")) .build();
Set org.streampipes.tutorial.vehicle
as your new topic by replacing the term ``TOPIC_SHOULD_BE_CHANGED`.
In this example, we defined that the data stream consists of events in a JSON format and that Kafka is used as a message broker to transmit events. The last build() method call triggers the construction of the RDF-based data stream definition.
That's it! In the next section, we will connect the data stream to a source and inspect the generated RDF description.
Let‘s assume our stream should produce some random values that are sent to StreamPipes. We’ll add a very simple data simulator to the executeStream
method as follows:
@Override public void executeStream() { SpKafkaProducer producer = new SpKafkaProducer("localhost:9092", "TOPIC_SHOULD_BE_CHANGED"); Random random = new Random(); Runnable runnable = new Runnable() { @Override public void run() { for (;;) { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("timestamp", System.currentTimeMillis()); jsonObject.addProperty("plateNumber", "KA-FZ 1"); jsonObject.addProperty("latitude", random.nextDouble()); jsonObject.addProperty("longitude", random.nextDouble()); producer.publish(jsonObject.toString()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; new Thread(runnable).start(); }
Change the topic and the URL of your Kafka broker (as stated in the controller).
A data source can be seen like a container for a set of data streams. Usually, a data source includes events that are logically or physically connected. For instance, in our example we would add other streams produced by vehicle sensors (such as fuel consumption) to the same data source description.
Open the class DataSource
which should look as follows:
package my.groupId.pe.mypackagename; import org.streampipes.container.declarer.DataStreamDeclarer; import org.streampipes.container.declarer.SemanticEventProducerDeclarer; import org.streampipes.model.graph.DataSourceDescription; import org.streampipes.sdk.builder.DataSourceBuilder; import java.util.Arrays; import java.util.List; public class DataSource implements SemanticEventProducerDeclarer { public DataSourceDescription declareModel() { return DataSourceBuilder.create("my.groupId.mypackagename.source", "MySource " + "Source", "") .build(); } public List<DataStreamDeclarer> getEventStreams() { return Arrays.asList(new MySourceStream()); } }
First, we need to define the source. Similar to data streams, a source consists of an id, a human-readable name and a description. Replace the content defined in the declareModel
method with the following code:
return DataSourceBuilder.create("org.streampipes.tutorial.source.vehicle", "Vehicle Source", "A data source that " + "holds event streams produced by vehicles.") .build();
The final step is to define the deployment type of our new data source. In this tutorial, we will create a so-called StandaloneModelSubmitter
. This client will start an embedded web server that provides the description of our data source.
Go to the class Init
that implements StandaloneModelSubmitter
, which should look as follows:
package my.groupId.main; import org.streampipes.container.init.DeclarersSingleton; import org.streampipes.container.standalone.init.StandaloneModelSubmitter; import my.groupId.config.Config; import my.groupId.pe.mypackagename.DataSource; public class Init extends StandaloneModelSubmitter { public static void main(String[] args) throws Exception { DeclarersSingleton.getInstance() .add(new DataSource()); new Init().init(Config.INSTANCE); } }
This code adds the VehicleSource
. Finally, the init
method is called which triggers the generation of the corresponding RDF description and startup of the web server.
Now we are ready to start our first container!
Execute the main method in the class Main
we've just created, open a web browser and navigate to http://localhost:8090, or change the port according to the value of the SP_PORT
variable in the env file.
You should see something as follows:
Click on the link of the data source to see the RDF description of the pipeline element.
The container automatically registers itself in the Consul installation of StreamPipes.
To install the just created element, open the StreamPipes UI and follow the manual provided in the [user guide](../user -guide-introduction).
Congratulations! You've just created your first pipeline element for StreamPipes. There are many more things to explore and data sources can be defined in much more detail.