Version 0.61.0 of StreamPipes comes with an improved event model. This model makes it easier to support more complex data streams (e.g., streams containing nested properties and lists) and includes features such as automatically resolving conflicts when merging two event streams.
If you are only using the pipeline elements that are included in StreamPipes, you only need to update the element description (My Elements -> Update). However, if you've already developed your own pipeline elements, some code changes are required to make your elements work with versions >= 0.61.0.
EventProcessor
instead of extending StandaloneEventProcessorEngine
// old public class MyProcessor extends StandaloneEventProcessorEngine<MyProcessorParameters> { ... } // new public class MyProcessor implements EventProcessor<MyProcessorParameters>{ ... }
onInvocation
method:// old @Override public void onInvocation(MyProcessorParameters params, DataProcessorInvocation dataProcessorInvocation) { } // new @Override public void onInvocation(MyProcessorParameters params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { }
onEvent
method:// old @Override public void onEvent(Map<String, Object> in, String s, SpOutputCollector out) { } // new @Override public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { }
onEvent
method to the new event model:collector.onEvent()
calls to the collector with collector.collect()
// old @Override public void onEvent(Map<String, Object> in, String s, SpOutputCollector out) { String value = String.valueOf(in.get(valueField)); } // new @Override public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { String value = in.getFieldBySelector(valueField).getAsPrimitive().getAsString(); }
See the documentation on the event class for further details.
Forward an Event
instead of a Map
to the collector. If needed, create a new instance of the Event
class.
Modify the modifications of the input event, e.g.:
// old @Override public void onEvent(Map<String, Object> in, String s, SpOutputCollector out) { in.put("new", "a new field"); out.onEvent(in); } // new @Override public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { in.addField("new", "a new field"); out.collect(in); }
onInvocation
method, use a method reference instead of the lambda expression as return type:// old return new ConfiguredEventProcessor<>(params, () -> new MyProcessor(params)); // new return new ConfiguredEventProcessor<>(params, MyProcessor::new);
onInvocation
method:// old @Override public ConfiguredEventProcessor<MyParameters> onInvocation(DataProcessorInvocation graph) { ... } // new @Override public ConfiguredEventProcessor<MyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) { ... }
fromExtractor
method and use the provided ProcessingElementParameterExtractor
EventSink
instead of extending EventSink
// old public class MySink extends EventSink<MySinkParameters> { ... } // new public class MySink implements EventSink<MySinkParameters>{ ... }
If present, remove the constructor that includes the parameter class.
Change the bind
method to onInvocation
as follows:
// old @Override public void bind(DemonstratorValveParameters parameters) throws SpRuntimeException { ... } // new @Override public void onInvocation(DemonstratorValveParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { ... }
onEvent
method// old @Override public void onEvent(Map<String, Object> event, String sourceInfo) { // new @Override public void onEvent(Event event) {
If necessary, adapt your logic to use the new event object.
Rename the discard
method to onDetach
.
onInvocation
method, use a method reference instead of the lambda expression as return type:// old return new ConfiguredEventSink<>(params, () -> new MySink(params)); // new return new ConfiguredEventSink<>(params, MySink::new);