blob: 6f09a3046b86e4256308e524bff8ff096d7a61e4 [file] [log] [blame]
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
package io.s4;
import io.s4.example.model.Controller;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Stream<T extends Event> implements Runnable {
Logger logger = LoggerFactory.getLogger(Stream.class);
final static private String DEFAULT_SEPARATOR = "^";
final static private int CAPACITY = 1000;
final private String name;
final private Key<T> key;
final private ProcessingElement[] targetPEs;
final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
final private Thread thread;
/*
* Streams send event of a given type using a specific key to target
* processing elements.
*/
public Stream(App app, String name, KeyFinder<T> finder,
ProcessingElement... processingElements) {
app.addStream(this);
this.name = name;
if (finder == null) {
this.key = null;
} else {
this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
}
this.targetPEs = processingElements;
/* Start streaming. */
/*
* TODO: This is only for prototyping. Comm layer will take care of this
* in the real implementation.
*/
// logger.trace("Start thread for stream " + name);
thread = new Thread(this, name);
thread.start();
}
/*
* This constructor will create a broadcast stream. That is, the events will
* be sent to all the PE instances.
*/
public Stream(App app, String name, ProcessingElement... processingElements) {
this(app, name, null, processingElements);
}
public void put(T event) {
try {
// System.out.println("Remaining capacity in stream " + name + ":" +
// queue.remainingCapacity());
// System.out.println("PUT: " + event);
queue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(-1);
}
}
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* @return the key
*/
public Key<T> getKey() {
return key;
}
/**
* @return the list of target processing elements.
*/
public ProcessingElement[] getTargetPEs() {
return targetPEs;
}
public void close() {
thread.interrupt();
}
@Override
public void run() {
while (true) {
try {
/* Get oldest event in queue. */
T event = queue.take();
/* Send event to each target PE. */
for (int i = 0; i < targetPEs.length; i++) {
if (key == null) {
/* Broadcast to all PE instances! */
/* STEP 1: find all PE instances. */
List<ProcessingElement> pes = targetPEs[i].getAllInstances();
/* STEP 2: iterate and pass event to PE instance. */
for(ProcessingElement pe : pes) {
pe.handleInputEvent(event);
}
} else {
/* We have a key, send to target PE. */
/* STEP 1: find the PE instance for key. */
ProcessingElement pe = targetPEs[i]
.getInstanceForKey(key.get(event));
/* STEP 2: pass event to PE instance. */
pe.handleInputEvent(event);
}
}
} catch (InterruptedException e) {
logger.info("Closing stream {}.", name);
return;
}
}
}
}