blob: bffbad29b999aa7a523bad5cce9157e54796eb7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.function.example;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.function.FunctionId;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataStreamBuilder;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Formats;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Protocols;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.standalone.function.FunctionConfig;
import org.apache.streampipes.wrapper.standalone.function.FunctionConfigBuilder;
import org.apache.streampipes.wrapper.standalone.function.FunctionContext;
import org.apache.streampipes.wrapper.standalone.function.StreamPipesFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class FunctionPublishExample extends StreamPipesFunction {
private static final Logger LOG = LoggerFactory.getLogger(FunctionPublishExample.class);
private static final String FUNCTION_ID = "org.apache.streampipes.example.function.publish";
private static final String STREAM_APP_ID = "example-output-stream-1";
private SpOutputCollector outputCollector;
@Override
public List<String> requiredStreamIds() {
return List.of("urn:streampipes.apache.org:eventstream:plSEjN");
}
@Override
public void onServiceStarted(FunctionContext context) {
LOG.info("Service started");
this.outputCollector = context.getOutputCollectors().get(STREAM_APP_ID);
}
@Override
public void onEvent(Event event, String streamId) {
LOG.info("on event");
var ev = new Event();
ev.addField("timestamp", System.currentTimeMillis());
ev.addField("example-property", "abc");
this.outputCollector.collect(ev);
}
@Override
public void onServiceStopped() {
LOG.info("service stopped");
}
@Override
public FunctionConfig getFunctionConfig() {
return FunctionConfigBuilder
.create(FunctionId.from(FUNCTION_ID, 1))
.withOutputStream(DataStreamBuilder.create(STREAM_APP_ID, "My Function Stream", "")
.property(EpProperties.timestampProperty("timestamp"))
.property(EpProperties.stringEp(
Labels.from("my-example-property", "test", "test"),
"example-property",
SO.TEXT,
PropertyScope.MEASUREMENT_PROPERTY))
.format(Formats.jsonFormat())
.protocol(Protocols.kafka("nats", 9094, STREAM_APP_ID))
.build())
.build();
}
}