blob: 932af164ae48000abe2d9ae1801854edaea204cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.utils;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.ClaimStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
/**
*
* A single consumer queue that uses the LMAX Disruptor. They key to the performance is
* the ability to catch up to the producer by processing tuples in batches.
*/
public class DisruptorQueue implements IStatefulObject {
static final Object FLUSH_CACHE = new Object();
static final Object INTERRUPT = new Object();
RingBuffer<MutableObject> _buffer;
Sequence _consumer;
SequenceBarrier _barrier;
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
private final Lock readLock = cacheLock.readLock();
private final Lock writeLock = cacheLock.writeLock();
private static String PREFIX = "disruptor-";
private String _queueName = "";
public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
this._queueName = PREFIX + queueName;
_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
} else {
// make sure we flush the pending messages in cache first
try {
publishDirect(FLUSH_CACHE, true);
} catch (InsufficientCapacityException e) {
throw new RuntimeException("This code should be unreachable!", e);
}
}
}
public String getName() {
return _queueName;
}
public void consumeBatch(EventHandler<Object> handler) {
consumeBatchToCursor(_barrier.getCursor(), handler);
}
public void haltWithInterrupt() {
publish(INTERRUPT);
}
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
if(availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}
} catch (AlertException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
MutableObject mo = _buffer.get(curr);
Object o = mo.o;
mo.setObject(null);
if(o==FLUSH_CACHE) {
Object c = null;
while(true) {
c = _cache.poll();
if(c==null) break;
else handler.onEvent(c, curr, true);
}
} else if(o==INTERRUPT) {
throw new InterruptedException("Disruptor processing interrupted");
} else {
handler.onEvent(o, curr, curr == cursor);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//TODO: only set this if the consumer cursor has changed?
_consumer.set(cursor);
}
/*
* Caches until consumerStarted is called, upon which the cache is flushed to the consumer
*/
public void publish(Object obj) {
try {
publish(obj, true);
} catch (InsufficientCapacityException ex) {
throw new RuntimeException("This code should be unreachable!");
}
}
public void tryPublish(Object obj) throws InsufficientCapacityException {
publish(obj, false);
}
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
boolean publishNow = consumerStartedFlag;
if (!publishNow) {
readLock.lock();
try {
publishNow = consumerStartedFlag;
if (!publishNow) {
_cache.add(obj);
}
} finally {
readLock.unlock();
}
}
if (publishNow) {
publishDirect(obj, block);
}
}
private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);
}
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
}
public void consumerStarted() {
consumerStartedFlag = true;
// Use writeLock to make sure all pending cache add opearation completed
writeLock.lock();
writeLock.unlock();
}
public long population() { return (writePos() - readPos()); }
public long capacity() { return _buffer.getBufferSize(); }
public long writePos() { return _buffer.getCursor(); }
public long readPos() { return _consumer.get(); }
public float pctFull() { return (1.0F * population() / capacity()); }
@Override
public Object getState() {
Map state = new HashMap<String, Object>();
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
state.put("capacity", capacity());
state.put("population", wp - rp);
state.put("write_pos", wp);
state.put("read_pos", rp);
return state;
}
public static class ObjectEventFactory implements EventFactory<MutableObject> {
@Override
public MutableObject newInstance() {
return new MutableObject();
}
}
}