blob: 342f2c3c52eb73be9d24d75964a3f1f2c4ca620d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package streamer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import streamer.debug.FakeSink;
import streamer.debug.FakeSource;
public class PipelineImpl implements Pipeline {
protected String id;
protected boolean verbose = System.getProperty("streamer.Pipeline.debug", "false").equals("true");
public PipelineImpl(String id) {
this.id = id;
elements = initElementMap(id);
}
protected Map<String, Element> elements;
protected HashMap<String, Element> initElementMap(String id) {
HashMap<String, Element> map = new HashMap<String, Element>();
map.put(IN, new BaseElement(id + "." + IN));
map.put(OUT, new BaseElement(id + "." + OUT));
return map;
}
@Override
public Link getLink(String padName) {
Link link = elements.get(IN).getLink(padName);
if (link == null)
link = elements.get(OUT).getLink(padName);
return link;
}
@Override
public Set<String> getPads(Direction direction) {
switch (direction) {
case IN:
return elements.get(IN).getPads(direction);
case OUT:
return elements.get(OUT).getPads(direction);
}
return null;
}
@Override
public void validate() {
for (Element element : elements.values())
element.validate();
// Check IN element
{
Element element = get(IN);
int outPadsNumber = element.getPads(Direction.OUT).size();
int inPadsNumber = element.getPads(Direction.IN).size();
if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
throw new RuntimeException("[ " + this + "] Pads of input element of pipeline are not balanced. Element: " + element + ", output pads: "
+ element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
}
// Check OUT element
{
Element element = get(OUT);
int outPadsNumber = element.getPads(Direction.OUT).size();
int inPadsNumber = element.getPads(Direction.IN).size();
if ((outPadsNumber | inPadsNumber) > 0 && (outPadsNumber == 0 || inPadsNumber == 0))
throw new RuntimeException("[ " + this + "] Pads of output element of pipeline are not balanced. Element: " + element + ", output pads: "
+ element.getPads(Direction.OUT).toString() + ", input pads: " + element.getPads(Direction.IN).toString() + ".");
}
}
@Override
public void dropLink(String padName) {
if (elements.get(IN).getLink(padName) != null)
elements.get(IN).dropLink(padName);
if (elements.get(OUT).getLink(padName) != null)
elements.get(OUT).dropLink(padName);
}
@Override
public void dropLink(Link link) {
elements.get(IN).dropLink(link);
elements.get(OUT).dropLink(link);
}
@Override
public void replaceLink(Link existingLink, Link newLink) {
elements.get(IN).replaceLink(existingLink, newLink);
elements.get(OUT).replaceLink(existingLink, newLink);
}
@Override
public void setLink(String padName, Link link, Direction direction) {
// Wire links to internal elements instead
elements.get(direction.toString()).setLink(padName, link, direction);
}
@Override
public void poll(boolean block) {
throw new RuntimeException("Not implemented.");
}
@Override
public void handleData(ByteBuffer buf, Link link) {
get(IN).handleData(buf, link);
}
@Override
public void handleEvent(Event event, Direction direction) {
switch (direction) {
case IN:
get(IN).handleEvent(event, direction);
break;
case OUT:
get(OUT).handleEvent(event, direction);
break;
}
}
@Override
public void add(Element... elements) {
for (Element element : elements) {
String id = element.getId();
if (this.elements.containsKey(id))
throw new RuntimeException("This pipeline already contains element with same ID. New element: " + element + ", existing element: "
+ this.elements.get(id) + ".");
this.elements.put(id, element);
}
}
@Override
public void link(String... elementNames) {
elementNames = filterOutEmptyStrings(elementNames);
if (elementNames.length < 2)
throw new RuntimeException("At least two elements are necessary to create link between them.");
// Parse array of element and pad names
Element elements[] = new Element[elementNames.length];
String inputPads[] = new String[elementNames.length];
String outputPads[] = new String[elementNames.length];
int i = 0;
for (String elementName : elementNames) {
if (elementName.contains("< ")) {
inputPads[i] = elementName.substring(0, elementName.indexOf("< "));
elementName = elementName.substring(elementName.indexOf("< ") + 2);
} else {
inputPads[i] = STDIN;
}
if (elementName.contains(" >")) {
outputPads[i] = elementName.substring(elementName.indexOf(" >") + 2);
elementName = elementName.substring(0, elementName.indexOf(" >"));
} else {
outputPads[i] = STDOUT;
}
elements[i] = get(elementName);
if (elements[i] == null)
throw new RuntimeException("Cannot find element by name in this pipeline. Element name: \"" + elementName + "\" (" + elementNames[i] + "), pipeline: "
+ this + ".");
i++;
}
// Link elements
for (i = 0; i < elements.length - 1; i++) {
Element leftElement = elements[i];
Element rightElement = elements[i + 1];
String leftPad = outputPads[i];
String rightPad = inputPads[i + 1];
String linkId = leftElement.getId() + " >" + leftPad + " | " + rightPad + "< " + rightElement.getId();
if (verbose)
System.out.println("[" + this + "] INFO: Linking: " + linkId + ".");
Link link = new SyncLink(linkId);
leftElement.setLink(leftPad, link, Direction.OUT);
rightElement.setLink(rightPad, link, Direction.IN);
}
}
/**
* Filter out empty strings from array and return new array with non-empty
* elements only. If array contains no empty string, returns same array.
*/
private String[] filterOutEmptyStrings(String[] strings) {
boolean found = false;
for (String string : strings) {
if (string == null || string.isEmpty()) {
found = true;
break;
}
}
if (!found)
return strings;
List<String> filteredStrings = new ArrayList<String>(strings.length);
for (String string : strings)
if (string != null && !string.isEmpty())
filteredStrings.add(string);
return filteredStrings.toArray(new String[filteredStrings.size()]);
}
@Override
public void addAndLink(Element... elements) {
add(elements);
link(elements);
}
private void link(Element... elements) {
String elementNames[] = new String[elements.length];
int i = 0;
for (Element element : elements) {
elementNames[i++] = element.getId();
}
link(elementNames);
}
@Override
public Element get(String elementName) {
return elements.get(elementName);
}
@Override
public Link getLink(String elementName, String padName) {
return elements.get(elementName).getLink(padName);
}
@Override
public void setLink(String elementName, String padName, Link link, Direction direction) {
elements.get(elementName).setLink(padName, link, direction);
}
@Override
public String getId() {
return id;
}
@Override
public void runMainLoop(String elementName, String padName, boolean separateThread, boolean waitForStartEvent) {
validate();
Link link = getLink(elementName, padName);
if (link == null)
throw new NullPointerException("Cannot find link. Element name: " + elementName + ", element: " + get(elementName) + ", pad: " + padName + ".");
if (!waitForStartEvent)
link.sendEvent(Event.STREAM_START, Direction.OUT);
if (separateThread) {
Thread thread = new Thread(link);
thread.setDaemon(true);
thread.start();
} else {
link.run();
}
}
@Override
public String toString() {
return "Pipeline(" + id + ")";
}
/**
* Example.
*/
public static void main(String args[]) {
// System.setProperty("streamer.Link.debug", "true");
// System.setProperty("streamer.Element.debug", "true");
// System.setProperty("streamer.Pipeline.debug", "true");
Pipeline pipeline = new PipelineImpl("main");
// Create elements
pipeline.add(new FakeSource("source") {
{
incomingBufLength = 3;
numBuffers = 10;
delay = 100;
}
});
pipeline.add(new BaseElement("tee"));
pipeline.add(new FakeSink("sink") {
{
verbose = true;
}
});
pipeline.add(new FakeSink("sink2") {
{
verbose = true;
}
});
// Link elements
pipeline.link("source", "tee", "sink");
pipeline.link("tee >out2", "sink2");
// Run main loop
pipeline.runMainLoop("source", STDOUT, false, false);
}
}