blob: d5622b9629c1ed8239c4233217d2c3cb0b750d28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.disruptor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.locks.LockSupport;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Holder for Disruptor references.
* <p/>
* This is used to keep track of the usages of the Disruptors, so we know when a Disruptor is no longer in use, and
* can safely be discarded.
*/
public class DisruptorReference {
private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorReference.class);
private final Set<DisruptorEndpoint> endpoints = Collections
.newSetFromMap(new WeakHashMap<DisruptorEndpoint, Boolean>(4));
private final DisruptorComponent component;
private final String uri;
private final String name;
//The mark on the reference indicates if we are in the process of reconfiguring the Disruptor:
//(ref, mark) : Description
//(null, false) : not started or completely shut down
//(null, true) : in process of reconfiguring
//( x , false) : normally functioning Disruptor
//( x , true) : never set
private final AtomicMarkableReference<Disruptor<ExchangeEvent>> disruptor = new AtomicMarkableReference<Disruptor<ExchangeEvent>>(null, false);
private final DelayedExecutor delayedExecutor = new DelayedExecutor();
private final DisruptorProducerType producerType;
private final int size;
private final DisruptorWaitStrategy waitStrategy;
private final Queue<Exchange> temporaryExchangeBuffer;
//access guarded by this
private ExecutorService executor;
private LifecycleAwareExchangeEventHandler[] handlers = new LifecycleAwareExchangeEventHandler[0];
private int uniqueConsumerCount;
DisruptorReference(final DisruptorComponent component, final String uri, final String name, final int size,
final DisruptorProducerType producerType, final DisruptorWaitStrategy waitStrategy) throws Exception {
this.component = component;
this.uri = uri;
this.name = name;
this.size = size;
this.producerType = producerType;
this.waitStrategy = waitStrategy;
temporaryExchangeBuffer = new ArrayBlockingQueue<Exchange>(size);
reconfigure();
}
public boolean hasNullReference() {
return disruptor.getReference() == null;
}
private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException {
Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference();
if (currentDisruptor == null) {
// no current Disruptor reference, we may be reconfiguring or it was not started
// check which by looking at the reference mark...
boolean[] changeIsPending = new boolean[1];
while (currentDisruptor == null) {
currentDisruptor = disruptor.get(changeIsPending);
//Check if we are reconfiguring
if (currentDisruptor == null && !changeIsPending[0]) {
throw new DisruptorNotStartedException(
"Disruptor is not yet started or already shut down.");
} else if (currentDisruptor == null && changeIsPending[0]) {
//We should be back shortly...keep trying but spare CPU resources
LockSupport.parkNanos(1L);
}
}
}
return currentDisruptor;
}
public void tryPublish(final Exchange exchange) throws DisruptorNotStartedException, InsufficientCapacityException {
tryPublishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
}
public void publish(final Exchange exchange) throws DisruptorNotStartedException {
publishExchangeOnRingBuffer(exchange, getCurrentDisruptor().getRingBuffer());
}
private void publishExchangeOnRingBuffer(final Exchange exchange,
final RingBuffer<ExchangeEvent> ringBuffer) {
final long sequence = ringBuffer.next();
ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);
ringBuffer.publish(sequence);
}
private void tryPublishExchangeOnRingBuffer(final Exchange exchange, final RingBuffer<ExchangeEvent> ringBuffer) throws InsufficientCapacityException {
final long sequence = ringBuffer.tryNext();
ringBuffer.get(sequence).setExchange(exchange, uniqueConsumerCount);
ringBuffer.publish(sequence);
}
public synchronized void reconfigure() throws Exception {
LOGGER.debug("Reconfiguring disruptor {}", this);
shutdownDisruptor(true);
start();
}
private void start() throws Exception {
LOGGER.debug("Starting disruptor {}", this);
Disruptor<ExchangeEvent> newDisruptor = createDisruptor();
newDisruptor.start();
if (executor != null) {
//and use our delayed executor to really really execute the event handlers now
delayedExecutor.executeDelayedCommands(executor);
}
//make sure all event handlers are correctly started before we continue
for (final LifecycleAwareExchangeEventHandler handler : handlers) {
boolean eventHandlerStarted = false;
while (!eventHandlerStarted) {
try {
//The disruptor start command executed above should have triggered a start signal to all
//event processors which, in their death, should notify our event handlers. They respond by
//switching a latch and we want to await that latch here to make sure they are started.
if (!handler.awaitStarted(10, TimeUnit.SECONDS)) {
//we wait for a relatively long, but limited amount of time to prevent an application using
//this component from hanging indefinitely
//Please report a bug if you can reproduce this
LOGGER.error("Disruptor/event handler failed to start properly, PLEASE REPORT");
}
eventHandlerStarted = true;
} catch (InterruptedException e) {
//just retry
}
}
}
publishBufferedExchanges(newDisruptor);
disruptor.set(newDisruptor, false);
}
private Disruptor<ExchangeEvent> createDisruptor() throws Exception {
//create a new Disruptor
final Disruptor<ExchangeEvent> newDisruptor = new Disruptor<ExchangeEvent>(
ExchangeEventFactory.INSTANCE, size, delayedExecutor, producerType.getProducerType(),
waitStrategy.createWaitStrategyInstance());
//determine the list of eventhandlers to be associated to the Disruptor
final ArrayList<LifecycleAwareExchangeEventHandler> eventHandlers = new ArrayList<LifecycleAwareExchangeEventHandler>();
uniqueConsumerCount = 0;
for (final DisruptorEndpoint endpoint : endpoints) {
final Map<DisruptorConsumer, Collection<LifecycleAwareExchangeEventHandler>> consumerEventHandlers = endpoint.createConsumerEventHandlers();
if (consumerEventHandlers != null) {
uniqueConsumerCount += consumerEventHandlers.keySet().size();
for (Collection<LifecycleAwareExchangeEventHandler> lifecycleAwareExchangeEventHandlers : consumerEventHandlers
.values()) {
eventHandlers.addAll(lifecycleAwareExchangeEventHandlers);
}
}
}
LOGGER.debug("Disruptor created with {} event handlers", eventHandlers.size());
handleEventsWith(newDisruptor,
eventHandlers.toArray(new LifecycleAwareExchangeEventHandler[eventHandlers.size()]));
return newDisruptor;
}
private void handleEventsWith(Disruptor<ExchangeEvent> newDisruptor,
final LifecycleAwareExchangeEventHandler[] newHandlers) {
if (newHandlers == null || newHandlers.length == 0) {
handlers = new LifecycleAwareExchangeEventHandler[1];
handlers[0] = new BlockingExchangeEventHandler();
} else {
handlers = newHandlers;
}
resizeThreadPoolExecutor(handlers.length);
newDisruptor.handleEventsWith(handlers);
}
private void publishBufferedExchanges(Disruptor<ExchangeEvent> newDisruptor) {
//now empty out all buffered Exchange if we had any
final List<Exchange> exchanges = new ArrayList<Exchange>(temporaryExchangeBuffer.size());
while (!temporaryExchangeBuffer.isEmpty()) {
exchanges.add(temporaryExchangeBuffer.remove());
}
RingBuffer<ExchangeEvent> ringBuffer = newDisruptor.getRingBuffer();
//and offer them again to our new ringbuffer
for (final Exchange exchange : exchanges) {
publishExchangeOnRingBuffer(exchange, ringBuffer);
}
}
private void resizeThreadPoolExecutor(final int newSize) {
if (executor == null && newSize > 0) {
LOGGER.debug("Creating new executor with {} threads", newSize);
//no thread pool executor yet, create a new one
executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri,
newSize);
} else if (executor != null && newSize <= 0) {
LOGGER.debug("Shutting down executor");
//we need to shut down our executor
component.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = null;
} else if (executor instanceof ThreadPoolExecutor) {
LOGGER.debug("Resizing existing executor to {} threads", newSize);
//our thread pool executor is of type ThreadPoolExecutor, we know how to resize it
final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)executor;
threadPoolExecutor.setCorePoolSize(newSize);
threadPoolExecutor.setMaximumPoolSize(newSize);
} else if (newSize > 0) {
LOGGER.debug("Shutting down old and creating new executor with {} threads", newSize);
//hmmm...no idea what kind of executor this is...just kill it and start fresh
component.getCamelContext().getExecutorServiceManager().shutdown(executor);
executor = component.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, uri,
newSize);
}
}
private synchronized void shutdownDisruptor(boolean isReconfiguring) {
LOGGER.debug("Shutting down disruptor {}, reconfiguring: {}", this, isReconfiguring);
Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference();
disruptor.set(null, isReconfiguring);
if (currentDisruptor != null) {
//check if we had a blocking event handler to keep an empty disruptor 'busy'
if (handlers != null && handlers.length == 1
&& handlers[0] instanceof BlockingExchangeEventHandler) {
// yes we did, unblock it so we can get rid of our backlog,
// The eventhandler will empty its pending exchanges in our temporary buffer
final BlockingExchangeEventHandler blockingExchangeEventHandler = (BlockingExchangeEventHandler)handlers[0];
blockingExchangeEventHandler.unblock();
}
currentDisruptor.shutdown();
//they have already been given a trigger to halt when they are done by shutting down the disruptor
//we do however want to await their completion before they are scheduled to process events from the new
for (final LifecycleAwareExchangeEventHandler eventHandler : handlers) {
boolean eventHandlerFinished = false;
//the disruptor is now empty and all consumers are either done or busy processing their last exchange
while (!eventHandlerFinished) {
try {
//The disruptor shutdown command executed above should have triggered a halt signal to all
//event processors which, in their death, should notify our event handlers. They respond by
//switching a latch and we want to await that latch here to make sure they are done.
if (!eventHandler.awaitStopped(10, TimeUnit.SECONDS)) {
//we wait for a relatively long, but limited amount of time to prevent an application using
//this component from hanging indefinitely
//Please report a bug if you can repruduce this
LOGGER.error("Disruptor/event handler failed to shut down properly, PLEASE REPORT");
}
eventHandlerFinished = true;
} catch (InterruptedException e) {
//just retry
}
}
}
handlers = new LifecycleAwareExchangeEventHandler[0];
}
}
private synchronized void shutdownExecutor() {
resizeThreadPoolExecutor(0);
}
public String getName() {
return name;
}
public long getRemainingCapacity() throws DisruptorNotStartedException {
return getCurrentDisruptor().getRingBuffer().remainingCapacity();
}
public DisruptorWaitStrategy getWaitStrategy() {
return waitStrategy;
}
DisruptorProducerType getProducerType() {
return producerType;
}
public int getBufferSize() {
return size;
}
public int getPendingExchangeCount() {
try {
if (!hasNullReference()) {
return (int)(getBufferSize() - getRemainingCapacity() + temporaryExchangeBuffer.size());
}
} catch (DisruptorNotStartedException e) {
//fall through...
}
return temporaryExchangeBuffer.size();
}
public synchronized void addEndpoint(final DisruptorEndpoint disruptorEndpoint) {
LOGGER.debug("Adding Endpoint: " + disruptorEndpoint);
endpoints.add(disruptorEndpoint);
LOGGER.debug("Endpoint added: {}, new total endpoints {}", disruptorEndpoint, endpoints.size());
}
public synchronized void removeEndpoint(final DisruptorEndpoint disruptorEndpoint) {
LOGGER.debug("Removing Endpoint: " + disruptorEndpoint);
if (getEndpointCount() == 1) {
LOGGER.debug("Last Endpoint removed, shutdown disruptor");
//Shutdown our disruptor
shutdownDisruptor(false);
//As there are no endpoints dependent on this Disruptor, we may also shutdown our executor
shutdownExecutor();
}
endpoints.remove(disruptorEndpoint);
LOGGER.debug("Endpoint removed: {}, new total endpoints {}", disruptorEndpoint, endpoints.size());
}
public synchronized int getEndpointCount() {
return endpoints.size();
}
@Override
public String toString() {
return "DisruptorReference{" + "uri='" + uri + '\'' + ", endpoint count=" + endpoints.size()
+ ", handler count=" + handlers.length + '}';
}
/**
* Implementation of the {@link LifecycleAwareExchangeEventHandler} interface that blocks all calls to the #onEvent
* method until the #unblock method is called.
*/
private class BlockingExchangeEventHandler extends AbstractLifecycleAwareExchangeEventHandler {
private final CountDownLatch blockingLatch = new CountDownLatch(1);
@Override
public void onEvent(final ExchangeEvent event, final long sequence, final boolean endOfBatch) throws Exception {
blockingLatch.await();
final Exchange exchange = event.getSynchronizedExchange().cancelAndGetOriginalExchange();
if (exchange.getProperty(DisruptorEndpoint.DISRUPTOR_IGNORE_EXCHANGE, false, boolean.class)) {
// Property was set and it was set to true, so don't process Exchange.
LOGGER.trace("Ignoring exchange {}", exchange);
} else {
temporaryExchangeBuffer.offer(exchange);
}
}
public void unblock() {
blockingLatch.countDown();
}
}
/**
* When a consumer is added or removed, we need to create a new Disruptor due to its static configuration. However, we
* would like to reuse our thread pool executor and only add or remove the threads we need. On a reconfiguraion of the
* Disruptor, we need to atomically swap the current RingBuffer with a new and fully configured one in order to keep
* the producers operational without the risk of losing messages. Configuration of a RingBuffer by the Disruptor's
* start method has a side effect that immediately starts execution of the event processors (consumers) on the
* Executor passed as a constructor argument which is stored in a final field. In order to be able to delay actual
* execution of the event processors until the event processors of the previous RingBuffer are done processing and the
* thread pool executor has been resized to match the new consumer count, we delay their execution using this class.
*/
private static class DelayedExecutor implements Executor {
private final Queue<Runnable> delayedCommands = new LinkedList<Runnable>();
@Override
public void execute(final Runnable command) {
delayedCommands.offer(command);
}
public void executeDelayedCommands(final Executor actualExecutor) {
Runnable command;
while ((command = delayedCommands.poll()) != null) {
actualExecutor.execute(command);
}
}
}
}