blob: 2087085dc98a0c923cfbc3f01a8269e8b81d686e [file] [log] [blame]
package org.apache.archiva.event;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Simple Async Event Bus implementation
*
* @author jdumay
*/
public class AsynchronousEventBus implements EventBus
{
private final Set<EventObserver> observers = Collections.synchronizedSet(new HashSet());
private final BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
private final Thread workerThread;
private final int threads;
public AsynchronousEventBus(int threads)
{
this.threads = threads;
workerThread = new Thread(new WorkerRunnable());
workerThread.start();
}
public void emit(EventEmitter emitter, EventMessage message)
{
events.offer(new Event(emitter, message));
}
public void subscribe(EventObserver observer)
{
observers.add(observer);
}
public void unsubscribe(EventObserver observer)
{
observers.remove(observer);
}
public Set<EventObserver> getObservers() {
return new HashSet<EventObserver>(observers);
}
class WorkerRunnable implements Runnable
{
private final ExecutorService service;
public WorkerRunnable()
{
service = Executors.newFixedThreadPool(threads);
}
public void run()
{
while (true)
{
dequeueAndExecute();
}
}
private void dequeueAndExecute()
{
try
{
final Event event = events.take();
for (final EventObserver observer : observers)
{
service.execute(new Runnable()
{
public void run()
{
try
{
observer.observe(event);
}
finally
{
//log me
}
}
});
}
}
catch (InterruptedException e)
{
//Do nothing
}
}
}
}