blob: 300454c14f2e0c3ff80de4cbc36ca1d5acde8091 [file] [log] [blame]
package org.apache.streampipes.pe.examples.jvm.staticproperty;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.TreeInputNode;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class TreeInputSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
private static final String SP_KEY = "example-key";
private static final String TREE_KEY = "tree-key";
@Override
public DataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
TreeInputSink::new,
DataSinkBuilder.create("org.apache.streampipes.examples.treeinput", "Tree Input Example", "")
.requiredStream(StreamRequirementsBuilder.
create()
.requiredProperty(EpRequirements.anyProperty())
.build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create a simple text parameter
.requiredTextParameter(Labels.from(SP_KEY, "Example Key", "required by tree input"))
.requiredRuntimeResolvableTreeInput(
Labels.from(TREE_KEY, "Tree", "The tree input"),
Collections.singletonList(SP_KEY),
true)
.build()
);
}
@Override
public void onPipelineStarted(IDataSinkParameters params, EventSinkRuntimeContext runtimeContext) {
}
@Override
public void onEvent(Event event) {
}
@Override
public void onPipelineStopped() {
}
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor extractor) {
RuntimeResolvableTreeInputStaticProperty treeInput = extractor
.getStaticPropertyByName(
staticPropertyInternalName,
RuntimeResolvableTreeInputStaticProperty.class);
List<TreeInputNode> nodes = buildSampleNodes();
treeInput.setNodes(nodes);
return treeInput;
}
private List<TreeInputNode> buildSampleNodes() {
return Arrays.asList(buildExpandableNode("Fruit", buildFruitNodes()), buildExpandableNode("Vegetable", buildVegetableNodes()));
}
private TreeInputNode buildExpandableNode(String nodeName, List<TreeInputNode> children) {
TreeInputNode node = new TreeInputNode();
node.setNodeName(nodeName);
node.setChildren(children);
return node;
}
private List<TreeInputNode> buildFruitNodes() {
return Arrays.asList(
buildNode("Apple"),
buildNode("Banana"),
buildNode("Orange"),
buildExpandableNode("Sour", Arrays.asList(buildNode("Lemon"), buildNode("Guava"), buildNode("Cranberry")))
);
}
private List<TreeInputNode> buildVegetableNodes() {
return Arrays.asList(
buildNode("Tomato"),
buildNode("Pepper"),
buildNode("Carrot")
);
}
private TreeInputNode buildNode(String nodeName) {
TreeInputNode node = new TreeInputNode();
node.setNodeName(nodeName);
return node;
}
}