blob: 1f89d86d441ea9d05d8f341d0fd4ab3458d76525 [file] [log] [blame]
package org.apache.s4.core;
import java.util.HashSet;
import java.util.Set;
import org.apache.s4.base.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* A producer app uses one or more EventSource classes to provide events to streamables. At runtime, consumer apps
* subscribe to an event source by providing a streamable object. Each EventSource instance may correspond to a
* different type of event stream. Each EventSource may have an unlimited number of subscribers.
*
*/
public class EventSource implements Streamable {
/* No need to synchronize this object because we expect a single thread. */
private Set<Streamable> streamables = new HashSet<Streamable>();
private static final Logger logger = LoggerFactory.getLogger(EventSource.class);
final private String name;
public EventSource(App app, String name) {
this.name = name;
app.addEventSource(this);
}
/**
* Subscribe a streamable to this event source.
*
* @param aStream
*/
public void subscribeStream(Streamable aStream) {
logger.info("Subscribing stream: {} to event source: {}.", aStream.getName(), getName());
streamables.add(aStream);
}
/**
* Unsubscribe a streamable from this event source.
*
* @param stream
*/
public void unsubscribeStream(Streamable stream) {
logger.info("Unsubsubscribing stream: {} to event source: {}.", stream.getName(), getName());
streamables.remove(stream);
}
/**
* Send an event to all the subscribed streamables.
*
* @param event
*/
@Override
public void put(Event event) {
for (Streamable<Event> stream : streamables) {
stream.put(event);
}
}
/**
*
* @return the number of streamables subscribed to this event source.
*/
public int getNumSubscribers() {
return streamables.size();
}
/**
* @return the name of this event source.
*/
public String getName() {
return name;
}
/**
* Close all the streamables subscribed to this event source.
*/
@Override
public void close() {
for (Streamable stream : streamables) {
logger.info("Closing stream: {} in event source: {}.", stream.getName(), getName());
stream.close();
}
}
/**
*
* @return the set of streamables subscribed to this event source.
*/
public Set<Streamable> getStreamables() {
return streamables;
}
@Override
public void start() {
// TODO Auto-generated method stub
}
}