blob: 7ea3cb7806307a7797f2e34c749962ffa81d8b64 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. The ASF licenses this file to You
* under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. For additional information regarding
* copyright in this work, please see the NOTICE file in the top level
* directory of this distribution.
*/
package org.apache.abdera2.common.pusher;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class SimplePusher<T>
extends AbstractPusher<T> implements Pusher<T>, Receiver<T> {
private final static Log log = LogFactory.getLog(SimplePusher.class);
private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
private final ThreadPoolExecutor exec =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
private final Set<Listener<T>> listeners =
new HashSet<Listener<T>>();
public void startListening(Listener<T> listener) {
listener.beforeItems();
listeners.add(listener);
}
public void stopListening(Listener<T> listener) {
listener.afterItems();
listeners.remove(listener);
}
public void push(T entry) {
queue.add(entry);
}
public boolean isRunning() {
return !exec.isShutdown() &&
!exec.isTerminated();
}
public SimplePusher() {
exec.execute(
new Runnable() {
public void run() {
while(isRunning()) {
if (!queue.isEmpty()) {
final T el = queue.poll();
if (el != null) {
log.info("Processing item...");
try {
exec.execute(new Runnable() {
public void run() {
for (final Listener<T> l : listeners) {
try {
exec.execute(new Runnable() {
public void run() {
try {
l.onItem(el);
} catch (Throwable t) {
log.error(t);
}
}
});
} catch (Throwable t) {
log.error(t);
}
}
}
});
} catch (Throwable t) {
log.error(t);
}
}
}
}
}
}
);
}
public void shutdown() {
ThreadPoolExecutor exec = (ThreadPoolExecutor) this.exec;
exec.shutdown();
for (Listener<?> listener : listeners)
listener.afterItems();
listeners.clear(); // remove all the listeners
}
}