| --- | 
 | id: extend-tutorial-data-processors | 
 | title: "Tutorial: Data Processors" | 
 | sidebar_label: "Tutorial: Data Processors" | 
 | --- | 
 |  | 
 | In this tutorial, we will add a new data processor. | 
 |  | 
 | From an architectural point of view, we will create a self-contained service that includes the description of the data | 
 | processor and an implementation. | 
 |  | 
 | ## Objective | 
 |  | 
 | We are going to create a new data processor that realizes a simple geofencing algorithm - we detect vehicles that enter | 
 | a specified radius around a user-defined location. | 
 | This pipeline element will be a generic element that works with any event stream that provides geospatial coordinates in | 
 | form of a latitude/longitude pair. | 
 |  | 
 | The algorithm outputs every location event once the position has entered the geofence. | 
 |  | 
 | :::note | 
 |  | 
 | The implementation in this tutorial is pretty simple - our processor will fire an event every time the GPS location is | 
 | inside the geofence. | 
 | In a real-world application, you would probably want to define a pattern that recognizes the _first_ event a vehicle | 
 | enters the geofence. | 
 |  | 
 | This can be easily done using a CEP library. | 
 |  | 
 | ::: | 
 |  | 
 | ## Project setup | 
 |  | 
 | Instead of creating a new project from scratch, we recommend to use the Maven archetype to create a new project | 
 | skeleton (streampipes-archetype-extensions-jvm). | 
 | Enter the following command in a command line of your choice (Apache Maven needs to be installed): | 
 |  | 
 | ``` | 
 | mvn archetype:generate \ | 
 | -DarchetypeGroupId=org.apache.streampipes -DarchetypeArtifactId=streampipes-archetype-extensions-jvm \ | 
 | -DarchetypeVersion=0.93.0 -DgroupId=my.groupId \ | 
 | -DartifactId=my-example -DclassNamePrefix=MyExample -DpackageName=mypackagename | 
 | ``` | 
 |  | 
 | You will see a project structure similar to the structure shown in the [archetypes](06_extend-archetypes.md) section. | 
 |  | 
 | :::tip | 
 |  | 
 | Besides the basic project skeleton, the sample project also includes an example Dockerfile you can use to package your | 
 | application into a Docker container. | 
 |  | 
 | ::: | 
 |  | 
 | Now you're ready to create your first data processor for StreamPipes! | 
 |  | 
 | ## Adding data processor requirements | 
 |  | 
 | First, we will add a new stream requirement. | 
 | Create a new class `GeofencingProcessor` which should look as follows: | 
 |  | 
 | ```java | 
 | package org.apache.streampipes.pe.example; | 
 |  | 
 | import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; | 
 | import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; | 
 | import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; | 
 | import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; | 
 | import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; | 
 | import org.apache.streampipes.model.runtime.Event; | 
 | import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; | 
 | import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; | 
 | import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; | 
 | import org.apache.streampipes.sdk.helpers.EpProperties; | 
 | import org.apache.streampipes.sdk.helpers.EpRequirements; | 
 | import org.apache.streampipes.sdk.helpers.Labels; | 
 | import org.apache.streampipes.sdk.helpers.OutputStrategies; | 
 | import org.apache.streampipes.sdk.helpers.SupportedFormats; | 
 | import org.apache.streampipes.sdk.helpers.SupportedProtocols; | 
 | import org.apache.streampipes.vocabulary.SO; | 
 |  | 
 | public class GeofencingProcessor implements IStreamPipesDataProcessor { | 
 |  | 
 |   private static final String LATITUDE_CENTER = "latitude-center"; | 
 |   private static final String LONGITUDE_CENTER = "longitude-center"; | 
 |  | 
 |  | 
 |   public IDataProcessorConfiguration declareConfig() { | 
 |     return DataProcessorConfiguration.create( | 
 |         GeofencingProcessor::new, | 
 |         ProcessingElementBuilder.create( | 
 |               "org.apache.streampipes.tutorial-geofencing" | 
 |             ) | 
 |             .category(DataProcessorType.ENRICH) | 
 |             .withAssets(Assets.DOCUMENTATION, Assets.ICON) | 
 |             .build()); | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onPipelineStarted(IDataProcessorParameters params,  | 
 |                                 SpOutputCollector collector,  | 
 |                                 EventProcessorRuntimeContext runtimeContext) { | 
 |  | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onEvent(Event event,  | 
 |                       SpOutputCollector collector) { | 
 |  | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onPipelineStopped() { | 
 |  | 
 |   } | 
 | } | 
 |  | 
 |  | 
 | ``` | 
 |  | 
 | In this class, we need to implement three methods: The `declareConfig` method is used to define abstract stream | 
 | requirements such as event properties that must be present in any input stream that is later connected to the element | 
 | using the StreamPipes UI. | 
 | The second method, `onPipelineStarted` is triggered once a pipeline is started.  | 
 | The `onEvent` method is called for every incoming event.  | 
 | Finally, the `onPipelineStopped` method is called once the pipeline is stopped. | 
 |  | 
 | Similar to data sources, the SDK provides a builder class to generate the description for data processors. | 
 |  | 
 | The current code within the `declareConfig` method  creates a new data processor with the ID.  | 
 | The ID is used as the internal ID of the data processor, but also used to reference additional assets in the `resources` folder, such as a `strings.en` file, used to configure labels and description, and a `documentation.md` file, which will later servce as a markdown documentation in the UI. | 
 | But first, we will add some _stream requirements_ to the description. As we'd like to develop a generic pipeline element that | 
 | works with any event that provides a lat/lng pair, we define two stream requirements as stated below: | 
 |  | 
 | ```java | 
 | .requiredStream(StreamRequirementsBuilder | 
 |     .create() | 
 |     .requiredPropertyWithUnaryMapping( | 
 |         EpRequirements.domainPropertyReq(Geo.LAT), | 
 |         Labels.from("latitude-field","Latitude","The event property containing the latitude value"), | 
 |         PropertyScope.MEASUREMENT_PROPERTY | 
 |     ) | 
 |     .requiredPropertyWithUnaryMapping( | 
 |         EpRequirements.domainPropertyReq(Geo.LNG), | 
 |         Labels.from("longitude-field","Longitude","The event property containing the longitude value"), | 
 |         PropertyScope.MEASUREMENT_PROPERTY | 
 |     ) | 
 |     .build()) | 
 | ``` | 
 |  | 
 | The first line, `.requiredStream()` defines that we want a data processor with exactly one input stream. Adding more | 
 | stream requirements would create elements with multiple input connectors in StreamPipes. | 
 | Stream requirements can be assigned by using the `StreamRequirementsBuilder` class. | 
 | In our example, we define two requirements, so-called _domain property requirements_. In contrast to _data type | 
 | requirements_ where we'd expect an event property with a field of a specific data type (e.g., float), domain property | 
 | requirements expect a specific semantic type (called domain property), e.g., from a vocabulary such as the WGS84 Geo vocab. | 
 |  | 
 | Once a pipeline is deployed, we are interested in the actual field (and its field name) that contains the latitude and | 
 | longitude values. | 
 | In some cases, there might be more than one field that satisfies a property requirement, and we would like users to | 
 | select the property the geofencing component should operate on. | 
 | Therefore, our example uses the method `requiredPropertyWithUnaryMapping`, which will map a requirement to a real event | 
 | property of an input stream and let the user choose the appropriate field in the StreamPipes UI when pipelines are | 
 | defined. | 
 |  | 
 | Finally, the `PropertyScope` indicates that the required property is a measurement value (in contrast to a dimension | 
 | value). This allows us later to provide improved user guidance in the pipeline editor. | 
 |  | 
 | Similar to mapping properties, text parameters have an internalId (radius), a label and a description. | 
 | In addition, we can assign a _value specification_ to the parameter indicating the value range we support. | 
 | Our example supports a radius value between 0 and 1000 with a granularity of 1. | 
 | In the StreamPipes UI, a required text parameter is rendered as a text input field, in case we provide an optional value | 
 | specification, a slider input is automatically generated. | 
 |  | 
 | For now, we've assigned parameters with an internal ID, a label and a description.  | 
 | To decouple human-readable labels and description from the actual data processor description, it is possible to extract the strings to a properties file. | 
 | In the `resources` folder, switch to a folder with the same name as the data processor's ID. If you've used the Maven archetype to build our project, there should be a `strings.en` file. | 
 | In this file, we can configure labels and descriptions. For instance, instead of writing | 
 |  | 
 | ```java | 
 |  | 
 | .requiredPropertyWithUnaryMapping( | 
 |         EpRequirements.domainPropertyReq(Geo.LAT), | 
 |         Labels.from("latitude-field","Latitude","The event property containing the latitude value"), | 
 |         PropertyScope.MEASUREMENT_PROPERTY | 
 |     ) | 
 |  | 
 | ``` | 
 |  | 
 | it is recommended to write | 
 |  | 
 | ```java | 
 |  | 
 | .requiredPropertyWithUnaryMapping( | 
 |         EpRequirements.domainPropertyReq(Geo.LAT), | 
 |         Labels.withId("latitude-field"),  | 
 |         PropertyScope.MEASUREMENT_PROPERTY | 
 |     ) | 
 |  | 
 | ``` | 
 |  | 
 | and add the following line to the `strings.en` file: | 
 |  | 
 | ```properties | 
 |  | 
 | latitude-field.title=Latitude | 
 | latitute-field.description=The event property containing the latitude value | 
 |  | 
 | ``` | 
 |  | 
 | This feature will also ease future internationalization efforts. | 
 |  | 
 | Besides requirements, users should be able to define the center coordinate of the Geofence and the size of the fence | 
 | defined as a radius around the center in meters. | 
 | The radius can be defined by adding a simple required text field to the description: | 
 |  | 
 | ```java | 
 | .requiredIntegerParameter("radius","Geofence Size","The size of the circular geofence in meters.",0,1000,1) | 
 | ``` | 
 |  | 
 | Such user-defined parameters are called _static properties_. There are many different types of static properties (see | 
 | the [Processor SDK](06_extend-sdk-static-properties.md) for an overview). Similar to stream requirements, it is also recommended to type `Labels.withId("radius")` and move labels and descriptions to the resource file. | 
 |  | 
 | In this example, we'll further add two very simple input fields to let users provide latitude and longitude of the | 
 | geofence center. | 
 |  | 
 | Add the following line to the `declareConfig` method: | 
 |  | 
 | ```java | 
 |    .requiredFloatParameter(Labels.from(LATITUDE_KEY,"Latitude","The latitude value")) | 
 |     .requiredFloatParameter(Labels.from(LONGITUDE_KEY,"Longitude","The longitude value")) | 
 |  | 
 | ``` | 
 |  | 
 | Now we need to define the output of our Geofencing pipeline element. | 
 | As explained in the first section, the element should fire every time some geo-located entity arrives within the defined | 
 | geofence. | 
 | Therefore, the processor outputs the same schema as it receives as an input. | 
 | Although we don't know the exact input right now as it depends on the stream users connect in StreamPipes when creating | 
 | pipelines, we can define an _output strategy_ as follows: | 
 |  | 
 | ```java | 
 | .outputStrategy(OutputStrategies.keep()) | 
 | ``` | 
 |  | 
 | This defines a _KeepOutputStrategy_, i.e., the input event schema is not modified by the processor. | 
 | There are many more output strategies you can define depending on the functionality you desire, e.g., _AppendOutput_ for | 
 | defining a processor that enriches events or _CustomOutput_ in case you would like users to select the output by | 
 | themselves. | 
 |  | 
 | That's it! We've now defined input requirements, required user input and an output strategy. | 
 | In the next section, you will learn how to extract these parameters once the pipeline element is invoked after a | 
 | pipeline was created. | 
 |  | 
 | ## Pipeline element invocation | 
 |  | 
 | Once users start a pipeline that uses our geofencing component, the _onPipelineStarted_ method in our class is called. The | 
 | interface `IDataProcessorParameters` includes convenient access to user-configured parameters a users has selected in the pipeline | 
 | editor and information on the actual streams that are connected to the pipeline element. | 
 |  | 
 | Next, we are interested in the fields of the input event stream that contains the latitude and longitude value we would | 
 | like to compute against the geofence center location as follows: | 
 |  | 
 | ```java | 
 |   String latitudeFieldName = params.extractor().mappingPropertyValue("latitude-field"); | 
 |   String longitudeFieldName = params.extractor().mappingPropertyValue("longitude-field"); | 
 | ``` | 
 |  | 
 | We use the same `internalId` we've used to define the mapping property requirements in the `declareModel` method. | 
 |  | 
 | Next, for extracting the geofence center coordinates, add to class variables centerLatitude and centerLongitude and | 
 | assign the selected values using the following statements: | 
 |  | 
 | ```java | 
 |   this.centerLatitude = params.extractor().singleValueParameter(LATITUDE_CENTER,Float.class); | 
 |   this.centerLongitude = params.extractor().singleValueParameter(LONGITUDE_CENTER,Float.class); | 
 | ``` | 
 |  | 
 | The radius value can be extracted as follows: | 
 |  | 
 | ```java | 
 |   int radius = params.extractor().singleValueParameter("radius",Float.class); | 
 | ``` | 
 |  | 
 | Great! That's all we need to describe a data processor for usage in StreamPipes. Your processor class should look as | 
 | follows: | 
 |  | 
 | ```java | 
 |  | 
 | package org.apache.streampipes.pe.example; | 
 |  | 
 | import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; | 
 | import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; | 
 | import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; | 
 | import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; | 
 | import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; | 
 | import org.apache.streampipes.model.runtime.Event; | 
 | import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; | 
 | import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; | 
 | import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; | 
 | import org.apache.streampipes.sdk.helpers.EpProperties; | 
 | import org.apache.streampipes.sdk.helpers.EpRequirements; | 
 | import org.apache.streampipes.sdk.helpers.Labels; | 
 | import org.apache.streampipes.sdk.helpers.OutputStrategies; | 
 | import org.apache.streampipes.sdk.helpers.SupportedFormats; | 
 | import org.apache.streampipes.sdk.helpers.SupportedProtocols; | 
 | import org.apache.streampipes.vocabulary.SO; | 
 |  | 
 | public class GeofencingProcessor implements IStreamPipesDataProcessor { | 
 |  | 
 |   private static final String LATITUDE_CENTER = "latitude-center"; | 
 |   private static final String LONGITUDE_CENTER = "longitude-center"; | 
 |  | 
 |   private float centerLatitude; | 
 |   private float centerLongitude; | 
 |   private String latitudeFieldName; | 
 |   private String longitudeFieldName; | 
 |  | 
 |   private int radius; | 
 |  | 
 |   public IDataProcessorConfiguration declareConfig() { | 
 |     return DataProcessorConfiguration.create( | 
 |         GeofencingProcessor::new, | 
 |         ProcessingElementBuilder.create("org.streampipes.tutorial-geofencing") | 
 |             .category(DataProcessorType.ENRICH) | 
 |             .withAssets(Assets.DOCUMENTATION, Assets.ICON) | 
 |             .withLocales(Locales.EN) | 
 |             .requiredStream(StreamRequirementsBuilder | 
 |                 .create() | 
 |                 .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat), | 
 |                     Labels.from("latitude-field", "Latitude", "The event " + | 
 |                         "property containing the latitude value"), PropertyScope.MEASUREMENT_PROPERTY) | 
 |                 .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng), | 
 |                     Labels.from("longitude-field", "Longitude", "The event " + | 
 |                         "property containing the longitude value"), PropertyScope.MEASUREMENT_PROPERTY) | 
 |                 .build()) | 
 |             .outputStrategy(OutputStrategies.keep()) | 
 |             .requiredIntegerParameter("radius", "Geofence Size", "The size of the circular geofence in meters.", 0, 1000, 1) | 
 |             .requiredFloatParameter(Labels.from(LATITUDE_CENTER, "Latitude", "The latitude value")) | 
 |             .requiredFloatParameter(Labels.from(LONGITUDE_CENTER, "Longitude", "The longitude value")) | 
 |             .build() | 
 |     ); | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onPipelineStarted(IDataProcessorParameters params, | 
 |                                 SpOutputCollector collector, | 
 |                                 EventProcessorRuntimeContext runtimeContext) { | 
 |     this.centerLatitude = params.extractor().singleValueParameter(LATITUDE_CENTER, Float.class); | 
 |     this.centerLongitude = params.extractor().singleValueParameter(LONGITUDE_CENTER, Float.class); | 
 |     this.latitudeFieldName = params.extractor().mappingPropertyValue("latitude-field"); | 
 |     this.longitudeFieldName = params.extractor().mappingPropertyValue("longitude-field"); | 
 |     this.radius = params.extractor().singleValueParameter("radius", Integer.class); | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onEvent(Event event, | 
 |                       SpOutputCollector collector) { | 
 |  | 
 |   } | 
 |  | 
 |   @Override | 
 |   public void onPipelineStopped() { | 
 |  | 
 |   } | 
 | } | 
 |  | 
 | ``` | 
 |  | 
 | ## Adding an implementation | 
 |  | 
 | Everything we need to do now is to add an implementation. | 
 |  | 
 | Add the following piece of code to the onEvent method, which realizes the Geofencing functionality: | 
 |  | 
 | ```java | 
 |  | 
 |     @Override | 
 |     public void onEvent(Event event,  | 
 |                         SpOutputCollector collector) { | 
 |         float latitude = event.getFieldBySelector(latitudeFieldName).getAsPrimitive().getAsFloat(); | 
 |         float longitude = event.getFieldBySelector(longitudeFieldName).getAsPrimitive().getAsFloat(); | 
 |      | 
 |         float distance = distFrom(latitude,longitude, centerLatitude, centerLongitude); | 
 |      | 
 |         if(distance <= radius){ | 
 |           collector.collect(event); | 
 |         } | 
 |     } | 
 |      | 
 |     public static float distFrom(float lat1, float lng1, float lat2, float lng2) { | 
 |         double earthRadius = 6371000; | 
 |         double dLat = Math.toRadians(lat2-lat1); | 
 |         double dLng = Math.toRadians(lng2-lng1); | 
 |         double a = Math.sin(dLat/2)*Math.sin(dLat/2) + | 
 |         Math.cos(Math.toRadians(lat1))*Math.cos(Math.toRadians(lat2)) * | 
 |         Math.sin(dLng/2)*Math.sin(dLng/2); | 
 |          | 
 |         double c = 2*Math.atan2(Math.sqrt(a),Math.sqrt(1-a)); | 
 |          | 
 |         return(float)(earthRadius*c); | 
 |     } | 
 | ``` | 
 |  | 
 | We won't go into details here as this isn't StreamPipes-related code, but in general the class extracts latitude and | 
 | longitude fields from the input event (which is provided as a map data type) and calculates the distance between the | 
 | geofence center and these coordinates. | 
 | If the distance is below the given radius, the event is forwarded to the next operator. | 
 |  | 
 | See the [event model](06_extend-sdk-event-model.md) guide to learn how to extract parameters from events. | 
 |  | 
 | ## Registering the pipeline element | 
 |  | 
 | The final step is to register the data processor in the `Init` method. Add the following line to | 
 | the `SpServiceDefinitionBuilder`: | 
 |  | 
 | ```java | 
 |  .registerPipelineElement(new GeofencingProcessor()) | 
 | ``` | 
 |  | 
 | ## Starting the service | 
 |  | 
 | :::tip | 
 |  | 
 | Once you start the service, it will register in StreamPipes with the hostname. The hostname will be auto-discovered and | 
 | should work out-of-the-box. | 
 | In some cases, the detected hostname is not resolvable from within a container (where the core is running). In this | 
 | case, provide a SP_HOST environment variable to override the auto-discovery. | 
 |  | 
 | ::: | 
 |  | 
 | :::tip | 
 |  | 
 | The default port of all pipeline element services as defined in the `create` method is port 8090. | 
 | If you'd like to run multiple services at the same time on your development machine, change the port here. As an | 
 | alternative, you can also provide an env variable `SP_PORT` which overrides the port settings. This is useful to use | 
 | different configs for dev and prod environments. | 
 |  | 
 | ::: | 
 |  | 
 | Now we are ready to start our service! | 
 |  | 
 | Configure your IDE to provide an environment variable called ``SP_DEBUG`` with value ``true`` when starting the project. | 
 |  | 
 | Execute the main method in the class `Init` we've just created. | 
 |  | 
 | The service automatically registers itself in StreamPipes. | 
 | To install the just created element, open the StreamPipes UI and follow the manual provided in | 
 | the [user guide](03_use-install-pipeline-elements.md). | 
 |  | 
 | ## Read more | 
 |  | 
 | Congratulations! You've just created your first data processor for StreamPipes. | 
 | There are many more things to explore and data processors can be defined in much more detail using multiple wrappers. | 
 | Follow our [SDK guide](06_extend-sdk-static-properties.md) to see what's possible! |