blob: afa51586702a9775104d18aa5a8b973c12e6a1d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
import org.apache.storm.metrics2.DisruptorMetrics;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* A single consumer queue that uses the LMAX Disruptor. They key to the performance is
* the ability to catch up to the producer by processing tuples in batches.
*/
public class DisruptorQueue implements IStatefulObject {
private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
private static final FlusherPool FLUSHER = new FlusherPool();
private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(PREFIX + "metrics-reporter").build());
private static int getNumFlusherPoolThreads() {
int numThreads = 100;
try {
Map<String, Object> conf = Utils.readStormConfig();
numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
} catch (Exception e) {
LOG.warn("Error while trying to read system config", e);
}
try {
String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
numThreads = Integer.parseInt(threads);
} catch (Exception e) {
LOG.warn("Error while parsing number of flusher pool threads", e);
}
LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", numThreads);
return numThreads;
}
private static class FlusherPool {
private static final String THREAD_PREFIX = "disruptor-flush";
private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true);
private ThreadPoolExecutor _exec;
private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
private HashMap<Long, TimerTask> _tt = new HashMap<>();
public FlusherPool() {
_exec = new ThreadPoolExecutor(1, getNumFlusherPoolThreads(), 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.DiscardPolicy());
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(THREAD_PREFIX + "-task-pool")
.build();
_exec.setThreadFactory(threadFactory);
}
public synchronized void start(Flusher flusher, final long flushInterval) {
ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
if (pending == null) {
pending = new ArrayList<>();
TimerTask t = new TimerTask() {
@Override
public void run() {
invokeAll(flushInterval);
}
};
_pendingFlush.put(flushInterval, pending);
_timer.schedule(t, flushInterval, flushInterval);
_tt.put(flushInterval, t);
}
pending.add(flusher);
}
private synchronized void invokeAll(long flushInterval) {
ArrayList<Flusher> tasks = _pendingFlush.get(flushInterval);
if (tasks != null) {
for (Flusher f: tasks) {
_exec.submit(f);
}
}
}
public synchronized void stop(Flusher flusher, long flushInterval) {
ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
if (pending != null) {
pending.remove(flusher);
if (pending.size() == 0) {
_pendingFlush.remove(flushInterval);
_tt.remove(flushInterval).cancel();
}
}
}
}
private static class ObjectEventFactory implements EventFactory<AtomicReference<Object>> {
@Override
public AtomicReference<Object> newInstance() {
return new AtomicReference<Object>();
}
}
private interface ThreadLocalInserter {
public void add(Object obj);
public void forceBatch();
public void flush(boolean block);
}
private class ThreadLocalJustInserter implements ThreadLocalInserter {
private final ReentrantLock _flushLock;
private final ConcurrentLinkedQueue<Object> _overflow;
public ThreadLocalJustInserter() {
_flushLock = new ReentrantLock();
_overflow = new ConcurrentLinkedQueue<>();
}
//called by the main thread and should not block for an undefined period of time
public synchronized void add(Object obj) {
boolean inserted = false;
if (_overflow.isEmpty()) {
try {
publishDirectSingle(obj, false);
inserted = true;
} catch (InsufficientCapacityException e) {
//Ignored
}
}
if (!inserted) {
_overflowCount.incrementAndGet();
_overflow.add(obj);
}
if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
try {
if (!_throttleOn) {
_throttleOn = true;
_cb.highWaterMark();
}
} catch (Exception e) {
throw new RuntimeException("Exception during calling highWaterMark callback!", e);
}
}
}
//May be called by a background thread
public void forceBatch() {
//NOOP
}
//May be called by a background thread
public void flush(boolean block) {
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
//Someone else if flushing so don't do anything
return;
}
try {
while (!_overflow.isEmpty()) {
publishDirectSingle(_overflow.peek(), block);
_overflowCount.addAndGet(-1);
_overflow.poll();
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
}
}
private class ThreadLocalBatcher implements ThreadLocalInserter {
private final ReentrantLock _flushLock;
private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow;
private ArrayList<Object> _currentBatch;
public ThreadLocalBatcher() {
_flushLock = new ReentrantLock();
_overflow = new ConcurrentLinkedQueue<ArrayList<Object>>();
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
//called by the main thread and should not block for an undefined period of time
public synchronized void add(Object obj) {
_currentBatch.add(obj);
_overflowCount.incrementAndGet();
if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
try {
if (!_throttleOn) {
_throttleOn = true;
_cb.highWaterMark();
}
} catch (Exception e) {
throw new RuntimeException("Exception during calling highWaterMark callback!", e);
}
}
if (_currentBatch.size() >= _inputBatchSize) {
boolean flushed = false;
if (_overflow.isEmpty()) {
try {
publishDirect(_currentBatch, false);
_overflowCount.addAndGet(0 - _currentBatch.size());
_currentBatch.clear();
flushed = true;
} catch (InsufficientCapacityException e) {
//Ignored we will flush later
}
}
if (!flushed) {
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
}
//May be called by a background thread
public synchronized void forceBatch() {
if (!_currentBatch.isEmpty()) {
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
//May be called by a background thread
public void flush(boolean block) {
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
//Someone else if flushing so don't do anything
return;
}
try {
while (!_overflow.isEmpty()) {
publishDirect(_overflow.peek(), block);
_overflowCount.addAndGet(0 - _overflow.poll().size());
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
}
}
private class Flusher implements Runnable {
private AtomicBoolean _isFlushing = new AtomicBoolean(false);
private final long _flushInterval;
public Flusher(long flushInterval, String name) {
_flushInterval = flushInterval;
}
public void run() {
if (_isFlushing.compareAndSet(false, true)) {
for (ThreadLocalInserter batcher: _batchers.values()) {
batcher.forceBatch();
batcher.flush(true);
}
_isFlushing.set(false);
}
}
public void start() {
FLUSHER.start(this, _flushInterval);
}
public void close() {
FLUSHER.stop(this, _flushInterval);
}
}
/**
* This inner class provides methods to access the metrics of the disruptor queue.
*/
public class QueueMetrics {
private final RateTracker _rateTracker = new RateTracker(10000, 10);
public long writePos() {
return _buffer.getCursor();
}
public long readPos() {
return _consumer.get();
}
public long overflow() {
return _overflowCount.get();
}
public long population() {
return writePos() - readPos();
}
public long capacity() {
return _buffer.getBufferSize();
}
public float pctFull() {
return (1.0F * population() / capacity());
}
public double arrivalRate(){
return _rateTracker.reportRate();
}
public double sojournTime(){
return tuplePopulation.get() / Math.max(arrivalRate(), 0.00001) * 1000.0;
}
public Object getState() {
Map state = new HashMap<String, Object>();
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
final long tuplePop = tuplePopulation.get();
final double arrivalRateInSecs = _rateTracker.reportRate();
//Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
state.put("capacity", capacity());
state.put("population", wp - rp);
state.put("tuple_population", tuplePop);
state.put("write_pos", wp);
state.put("read_pos", rp);
state.put("arrival_rate_secs", arrivalRateInSecs);
state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
state.put("overflow", _overflowCount.get());
return state;
}
public void notifyArrivals(long counts) {
_rateTracker.notify(counts);
tuplePopulation.getAndAdd(counts);
}
public void notifyDepartures(long counts) {
tuplePopulation.getAndAdd(-counts);
}
public void close() {
_rateTracker.close();
}
}
private final RingBuffer<AtomicReference<Object>> _buffer;
private final Sequence _consumer;
private final SequenceBarrier _barrier;
private final int _inputBatchSize;
private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
private final Flusher _flusher;
private final QueueMetrics _metrics;
private final DisruptorMetrics _disruptorMetrics;
private String _queueName = "";
private DisruptorBackpressureCallback _cb = null;
private int _highWaterMark = 0;
private int _lowWaterMark = 0;
private boolean _enableBackpressure = false;
private final AtomicLong _overflowCount = new AtomicLong(0);
private final AtomicLong tuplePopulation = new AtomicLong(0);
private volatile boolean _throttleOn = false;
// [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer task-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, Integer taskId, int port) {
this._queueName = PREFIX + queueName;
WaitStrategy wait;
if (readTimeout <= 0) {
wait = new LiteBlockingWaitStrategy();
} else {
wait = new TimeoutBlockingWaitStrategy(readTimeout, TimeUnit.MILLISECONDS);
}
_buffer = RingBuffer.create(type, new ObjectEventFactory(), size, wait);
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.addGatingSequences(_consumer);
_metrics = new QueueMetrics();
_disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, taskId, port);
//The batch size can be no larger than half the full queue size.
//This is mostly to avoid contention issues.
_inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
if(!METRICS_REPORTER_EXECUTOR.isShutdown()) {
METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
_disruptorMetrics.set(_metrics);
}
}, 15, 15, TimeUnit.SECONDS);
}
}
public String getName() {
return _queueName;
}
public boolean isFull() {
return (_metrics.population() + _overflowCount.get()) >= _metrics.capacity();
}
public void haltWithInterrupt() {
try {
publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
_flusher.close();
_metrics.close();
METRICS_REPORTER_EXECUTOR.shutdown();
} catch (InsufficientCapacityException e) {
//This should be impossible
throw new RuntimeException(e);
}
}
public void consumeBatch(EventHandler<Object> handler) {
if (_metrics.population() > 0) {
consumeBatchWhenAvailable(handler);
}
}
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
long availableSequence = _barrier.waitFor(nextSequence);
if (availableSequence >= nextSequence) {
consumeBatchToCursor(availableSequence, handler);
}
} catch (TimeoutException te) {
//Ignored
} catch (AlertException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
AtomicReference<Object> mo = _buffer.get(curr);
Object o = mo.getAndSet(null);
if (o == INTERRUPT) {
throw new InterruptedException("Disruptor processing interrupted");
} else if (o == null) {
LOG.error("NULL found in {}:{}", this.getName(), cursor);
} else {
_metrics.notifyDepartures(getTupleCount(o));
handler.onEvent(o, curr, curr == cursor);
if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
try {
if (_throttleOn) {
_throttleOn = false;
_cb.lowWaterMark();
}
} catch (Exception e) {
throw new RuntimeException("Exception during calling lowWaterMark callback!");
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
_consumer.set(cursor);
}
public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
this._cb = cb;
}
private static Long getId() {
return Thread.currentThread().getId();
}
private long getTupleCount(Object obj) {
//a published object could be an instance of either AddressedTuple, ArrayList<AddressedTuple>, or HashMap<Integer, ArrayList<TaskMessage>>.
long tupleCount;
if (obj instanceof ArrayList) {
tupleCount = ((ArrayList) obj).size();
} else if (obj instanceof HashMap) {
tupleCount = 0;
for (Object value:((HashMap) obj).values()) {
tupleCount += ((ArrayList) value).size();
}
} else {
tupleCount = 1;
}
return tupleCount;
}
private void publishDirectSingle(Object obj, boolean block) throws InsufficientCapacityException {
long at;
long numberOfTuples;
if (block) {
at = _buffer.next();
} else {
at = _buffer.tryNext();
}
AtomicReference<Object> m = _buffer.get(at);
m.set(obj);
_buffer.publish(at);
numberOfTuples = getTupleCount(obj);
_metrics.notifyArrivals(numberOfTuples);
}
private void publishDirect(ArrayList<Object> objs, boolean block) throws InsufficientCapacityException {
int size = objs.size();
if (size > 0) {
long end;
if (block) {
end = _buffer.next(size);
} else {
end = _buffer.tryNext(size);
}
long begin = end - (size - 1);
long at = begin;
long numberOfTuples = 0;
for (Object obj: objs) {
AtomicReference<Object> m = _buffer.get(at);
m.set(obj);
at++;
numberOfTuples += getTupleCount(obj);
}
_metrics.notifyArrivals(numberOfTuples);
_buffer.publish(begin, end);
}
}
public void publish(Object obj) {
Long id = getId();
ThreadLocalInserter batcher = _batchers.get(id);
if (batcher == null) {
//This thread is the only one ever creating this, so this is safe
if (_inputBatchSize > 1) {
batcher = new ThreadLocalBatcher();
} else {
batcher = new ThreadLocalJustInserter();
}
_batchers.put(id, batcher);
}
batcher.add(obj);
batcher.flush(false);
}
@Override
public Object getState() {
return _metrics.getState();
}
public DisruptorQueue setHighWaterMark(double highWaterMark) {
this._highWaterMark = (int)(_metrics.capacity() * highWaterMark);
return this;
}
public DisruptorQueue setLowWaterMark(double lowWaterMark) {
this._lowWaterMark = (int)(_metrics.capacity() * lowWaterMark);
return this;
}
public int getHighWaterMark() {
return this._highWaterMark;
}
public int getLowWaterMark() {
return this._lowWaterMark;
}
public DisruptorQueue setEnableBackpressure(boolean enableBackpressure) {
this._enableBackpressure = enableBackpressure;
return this;
}
public boolean getThrottleOn() {
return _throttleOn;
}
//This method enables the metrics to be accessed from outside of the DisruptorQueue class
public QueueMetrics getMetrics() {
return _metrics;
}
}