blob: 1576044ef93e66832174d1965f2b75a31a5e7e3c [file] [log] [blame]
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
* License. See accompanying LICENSE file.
*/
package org.apache.s4.core;
import java.util.Collection;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
import org.apache.s4.base.Event;
import org.apache.s4.core.gen.OverloadDispatcher;
import org.apache.s4.core.gen.OverloadDispatcherGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
/**
* <p>
* Base class for implementing processing in S4. All instances are organized as follows:
* <ul>
* <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
* application graph.
* <li>PE prototypes manage the creation and destruction of PE instances.
* <li>All PE instances are clones of a PE prototype.
* <li>PE instances are associated with a unique key.
* <li>PE instances do the actual work by processing any number of input events of various types and emit output events
* of various types.
* <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method. See
* {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
* <ul>
* <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be dispatched to
* this method.
* <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
* dispatched to this method when certain conditions are met. See {@link #setTrigger(Class, int, long, TimeUnit)}.
* </ul>
* <li>
* A PE implementation must not create threads. A periodic task can be implemented by overloading the {@link #onTime()}
* method. See {@link #setTimerInterval(long, TimeUnit)}
* <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
* <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
* <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there may be
* several event processing methods that can safely run concurrently. To enable concurrency, annotate the implementation
* of {@code ProcessingElement} with {@link ThreadSafe}.
* <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()} method.
* <li>PE class fields are cloned from the prototype. References are also copied which means that if the prototype
* creates a collection object, all instances will be sharing the same collection object which is usually <em>NOT</em>
* what the programmer intended . The application developer is responsible for initializing objects in the
* {@link #onCreate()} method. For example, if each instance requires a
* <tt>List<tt/> object the PE should implement the following:
* <pre>
* {@code
* public class MyPE extends ProcessingElement {
*
* private Map<String, Integer> wordCount;
*
* ...
*
* onCreate() {
* wordCount = new HashMap<String, Integer>;
* logger.trace("Created a map for instance PE with id {}, getId());
* }
* }
* }
* </pre>
*
*
* </ul>
*
*
*
*
*/
public abstract class ProcessingElement implements Cloneable {
private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
private static final String SINGLETON = "singleton";
protected App app;
/*
* This maps holds all the instances. We make it package private to prevent concrete classes from updating the
* collection.
*/
Cache<String, ProcessingElement> peInstances;
/* This map is initialized in the prototype and cloned to instances. */
Map<Class<? extends Event>, Trigger> triggers;
/* PE instance id. */
String id = "";
/* Private fields. */
private ProcessingElement pePrototype;
private boolean haveTriggers = false;
private long timerIntervalInMilliseconds = 0;
private Timer timer;
private boolean isPrototype = true;
private boolean isThreadSafe = false;
private boolean isFirst = true;
private boolean isSingleton = false;
private transient OverloadDispatcher overloadDispatcher;
protected ProcessingElement() {
OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
Class<?> overloadDispatcherClass = oldg.generate();
try {
overloadDispatcher = (OverloadDispatcher) overloadDispatcherClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
peInstances = CacheBuilder.newBuilder().build(new CacheLoader<String, ProcessingElement>() {
@Override
public ProcessingElement load(String key) throws Exception {
return createPE(key);
}
});
triggers = new MapMaker().makeMap();
/*
* Only the PE Prototype uses the constructor. The PEPrototype field will be cloned by the instances and point
* to the prototype.
*/
this.pePrototype = this;
}
/**
* Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
*
* @param app
* the app that contains this PE
*/
public ProcessingElement(App app) {
this();
setApp(app);
}
/**
* This method is called by the PE timer. By default it is synchronized with the {@link #onEvent()} and
* {@link #onTrigger()} methods. To execute concurrently with other methods, the {@link ProcessingElelment} subclass
* must be annotated with {@link @ThreadSafe}.
*
* Override this method to implement a periodic process.
*/
protected void onTime() {
}
/**
* This method is called after a PE instance is created. Use it to initialize fields that are PE instance specific.
* PE instances are created using {#clone()}.
*
* <p>
* <b>Fields initialized in the class constructor are shared by all PE instances.</b>
* </p>
*/
abstract protected void onCreate();
/**
* This method is called before a PE instance is removed. Use it to close resources and clean up.
*/
abstract protected void onRemove();
/**
* PE objects must be associated with one and only one {@code App} object.
*
* @return the app
*/
public App getApp() {
return app;
}
public void setApp(App app) {
if (this.app != null) {
throw new RuntimeException("Application was already assigne to this processing element");
}
this.app = app;
app.addPEPrototype(this, null);
}
/**
* Returns the approximate number of PE instances from the cache.
*
* @return the approximate number of PE instances.
*/
public long getNumPEInstances() {
return peInstances.size();
}
Map<String, ProcessingElement> getPEInstances() {
return peInstances.asMap();
}
/**
* Set PE expiration and cache size.
* <p>
* PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
* creation, or last access.
* <p>
* Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
* <p>
* When this method is called all existing PE instances are destroyed.
*
*
* @param maximumSize
* the approximate maximum number of PEs in the cache.
* @param duration
* the PE duration
* @param timeUnit
* the time unit
* @return the PE prototype
*/
public ProcessingElement setPECache(int maximumSize, long duration, TimeUnit timeUnit) {
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
peInstances = CacheBuilder.newBuilder().expireAfterAccess(duration, timeUnit).maximumSize(maximumSize)
.build(new CacheLoader<String, ProcessingElement>() {
@Override
public ProcessingElement load(String key) throws Exception {
return createPE(key);
}
});
return this;
}
/**
* This trigger is fired when the following conditions occur:
*
* <ul>
* <li>An event of eventType arrived to the PE instance
* <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
* interval.
* </ul>
*
* <p>
* When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
* the argument eventType.
*
* @param eventType
* the type of event on which this trigger will fire.
* @param numEvents
* number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
* every input event.)
* @param interval
* minimum time between triggers. Set to zero if no time interval needed.
* @param timeUnit
* the TimeUnit for the argument interval. Can set to null if no time interval needed.
* @return the PE prototype
*/
public ProcessingElement setTrigger(Class<? extends Event> eventType, int numEvents, long interval,
TimeUnit timeUnit) {
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
Preconditions.checkNotNull(eventType, "Need eventType to set trigger.");
Preconditions.checkArgument(numEvents > 0 || interval > 0,
"To set trigger numEvent OR interval must be greater than zero.");
Preconditions.checkArgument(timeUnit != null || interval < 1,
"To set trigger timeUnit is needed when interval is greater than zero.");
/* Skip trigger checking overhead if there are no triggers. */
haveTriggers = true;
if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
Trigger config = new Trigger(numEvents, interval);
triggers.put(eventType, config);
return this;
}
/**
* @return the isSingleton
*/
public boolean isSingleton() {
return isSingleton;
}
/**
* Makes this PE a singleton. A single PE instance is eagerly created and ready to receive events.
*
* @param isSingleton
* @throws ExecutionException
*/
public ProcessingElement setSingleton(boolean isSingleton) {
if (!isPrototype) {
logger.warn("This method can only be used on the PE prototype.");
return this;
}
this.isSingleton = isSingleton;
return this;
}
/**
* The duration of the periodic task controlled by the embedded timer.
*
* @param timeUnit
* the timeUnt of the returned value.
* @return the timer interval.
*/
public long getTimerInterval(TimeUnit timeUnit) {
return timeUnit.convert(timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
}
/**
* Set a timer that calls {@link #onTime()}.
*
* If {@code interval==0} the timer is disabled.
*
* @param interval
* in timeUnit
* @param timeUnit
* the timeUnit of interval
*/
public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
if (timer != null) {
timer.cancel();
}
if (interval == 0)
return this;
timer = new Timer();
return this;
}
/**
* Set to true if the concrete PE class has the {@link ThreadSafe} annotation. The default is false (no annotation).
* In general, application developers don't need to worry about thread safety in the concrete PEs. In some cases the
* PE needs to be thread safe to avoid deadlocks. For example , if the application graph has cycles and the queues
* are allowed to block, then some critical PEs with multiple incoming streams need to be made thread safe to avoid
* locking the entire PE instance.
*
* @return true if the PE implementation is considered thread safe.
*/
public boolean isThreadSafe() {
return isThreadSafe;
}
protected void handleInputEvent(Event event) {
Object object;
if (isThreadSafe) {
object = new Object(); // a dummy object TODO improve this.
} else {
object = this;
}
synchronized (object) {
/* Dispatch onEvent() method. */
overloadDispatcher.dispatchEvent(this, event);
/* Dispatch onTrigger() method. */
if (haveTriggers && isTrigger(event)) {
overloadDispatcher.dispatchTrigger(this, event);
}
}
}
private boolean isTrigger(Event event) {
return isTrigger(event, event.getClass());
}
/**
* Checks the trigger for this event type. Creates an inactive trigger if no trigger is found after recursively
* exploring the event class hierarchy. An inactive trigger never triggers.
*
* @return true if trigger is reached, false if trigger is not ready yet or if trigger is inactive
*
*/
private boolean isTrigger(Event event, Class<?> triggerClass) {
/* Check if there is a trigger for this event type. Create an */
Trigger trigger = triggers.get(triggerClass);
if (trigger == null) {
if (!Event.class.isAssignableFrom(triggerClass)) {
// reached termination condition
triggers.put(event.getClass(), new Trigger());
return false;
} else {
// further explore hierarchy
return isTrigger(event, triggerClass.getSuperclass());
}
} else {
/*
* Check if it is time to activate the trigger for this event type.
*/
return trigger.checkAndUpdate();
}
}
private void removeInstanceForKeyInternal(String id) {
if (id == null)
return;
/* First let the PE instance clean after itself. */
onRemove();
/* Remove PE instance. */
peInstances.invalidate(id);
}
protected void removeAll() {
/* Close resources in prototype. */
if (timer != null) {
timer.cancel();
logger.info("Timer stopped.");
}
/* Remove all the instances. */
peInstances.invalidateAll();
}
protected void close() {
removeInstanceForKeyInternal(id);
}
private ProcessingElement createPE(String id) {
ProcessingElement pe = (ProcessingElement) this.clone();
pe.isPrototype = false;
pe.id = id;
pe.triggers = Maps.newHashMap(triggers);
pe.onCreate();
logger.trace("Num PE instances: {}.", getNumPEInstances());
return pe;
}
/* This method is called by App just before the application starts. */
void initPEPrototypeInternal() {
/* Eagerly create singleton PE. */
if (isSingleton) {
try {
peInstances.get(SINGLETON);
logger.trace("Created singleton [{}].", getInstanceForKey(SINGLETON));
} catch (ExecutionException e) {
logger.error("Problem when trying to create a PE instance.", e);
}
}
/* Start timer. */
if (timer != null) {
timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
}
/* Check if this PE is annotated as thread safe. */
if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
// TODO: this doesn't seem to be working. isannotationpresent always returns false.
isThreadSafe = true;
logger.trace("Annotated with @ThreadSafe");
}
}
/**
* This method is designed to be used within the package. We make it package-private. The returned instances are all
* in the same JVM. Do not use it to access remote objects.
*
* @throws ExecutionException
*/
public ProcessingElement getInstanceForKey(String id) {
/* Check if instance for key exists, otherwise create one. */
try {
if (isSingleton) {
logger.trace(
"Requested a PE instance with key [{}]. The instance is a singleton and will ignore the key. The key should be set to null when requesting a singleton.",
id);
return peInstances.get(SINGLETON);
}
return peInstances.get(id);
} catch (ExecutionException e) {
logger.error("Problem when trying to create a PE instance.", e);
}
return null;
}
/**
* Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
*/
public Collection<ProcessingElement> getInstances() {
return peInstances.asMap().values();
}
/**
* This method returns a remote PE instance for key. TODO: not implemented for cluster configuration yet, use it
* only in single node configuration. for testing apps.
*
* @return pe instance for key. Null if if doesn't exist.
*/
public ProcessingElement getRemoteInstancesForKey() {
logger.warn("The getRemoteInstancesForKey() method is not implemented. Use "
+ "it to test your app in single node configuration only. Should work "
+ "transparently for remote objects once it is implemented.");
ProcessingElement pe = peInstances.asMap().get(id);
return pe;
}
/**
* This method returns an immutable map that contains all the PE instances for this prototype. PE instances may be
* located anywhere in the cluster. Be aware that this could be an expensive operation. TODO: not implemented for
* cluster configuration yet, use it only in single node configuration. for testing apps.
*/
public Map<String, ProcessingElement> getRemoteInstances() {
logger.warn("The getRemoteInstances() method is not implemented. Use "
+ "it to test your app in single node configuration only. Should work "
+ "transparently for remote objects once it is implemented.");
/*
* For now we just return a copy as a placeholder. We need to implement a custom map capable of working on an S4
* cluster as efficiently as possible.
*/
return ImmutableMap.copyOf(peInstances.asMap());
}
/**
* Unique ID for a PE instance.
*
* @return the id
*/
public String getId() {
return id;
}
/**
* The {@code ProcessingElement} prototype for this object.
*
* @return the corresponding {@code ProcessingElement} for this instance.
*/
public ProcessingElement getPrototype() {
return pePrototype;
}
/**
* This method exists simply to make <code>clone()</code> protected.
*/
@Override
protected Object clone() {
try {
Object clone = super.clone();
return clone;
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
/**
* Helper method to be used by PE implementation classes. Sends an event to all the target streams.
*
*/
protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {
for (int i = 0; i < streamArray.length; i++) {
streamArray[i].put(event);
}
}
private class OnTimeTask extends TimerTask {
@Override
public void run() {
for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
ProcessingElement peInstance = entry.getValue();
try {
if (isThreadSafe) {
peInstance.onTime();
} else {
synchronized (this) {
peInstance.onTime();
}
}
} catch (Exception e) {
logger.error("Caught exception in timer when calling PE instance [{}] with id [{}].", peInstance,
peInstance.id);
logger.error("Timer error.", e);
}
}
}
}
class Trigger {
final long intervalInMilliseconds;
final int intervalInEvents;
long lastTime;
int eventCount;
// inactive triggers never trigger anything, they are used as markers
boolean active = true;
Trigger() {
this.intervalInEvents = 0;
this.intervalInMilliseconds = 0;
this.active = false;
}
Trigger(int intervalInEvents, long intervalInMilliseconds) {
this.intervalInEvents = intervalInEvents;
this.intervalInMilliseconds = intervalInMilliseconds;
}
boolean checkAndUpdate() {
if (active) {
long timeLapse = System.currentTimeMillis() - lastTime;
eventCount++;
lastTime = System.currentTimeMillis();
if (timeLapse > intervalInMilliseconds || eventCount >= intervalInEvents) {
eventCount = 0;
return true;
}
}
return false;
}
boolean isActive() {
return active;
}
}
}