blob: 063447a73f1abf6930fea4512ce1f2ab1be20687 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License
package org.apache.storm.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
import org.apache.storm.metrics2.JcMetrics;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JCQueue implements IStatefulObject, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
private static final String PREFIX = "jc-";
private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR =
new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat(PREFIX + "metrics-reporter")
private final ExitCondition continueRunning = () -> true;
private final JcMetrics jcMetrics;
private final MpscArrayQueue<Object> recvQueue;
// only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
private final MpscUnboundedArrayQueue<Object> overflowQ;
private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting.
private final int producerBatchSz;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
private final JCQueue.QueueMetrics metrics;
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
String topologyId, String componentId, Integer taskId, int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
this.metrics = new JCQueue.QueueMetrics();
this.jcMetrics = metricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port);
//The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
this.backPressureWaitStrategy = backPressureWaitStrategy;
METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
public void run() {
}, 15, 15, TimeUnit.SECONDS);
public String getName() {
return queueName;
public void close() {
//No need to block, the task run by the executor is safe to run even after metrics are closed
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
public int consume(JCQueue.Consumer consumer) {
return consume(consumer, continueRunning);
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q.
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
public int size() {
return recvQueue.size() + overflowQ.size();
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
int total = drainCount + overflowDrainCount;
if (total > 0) {
return total;
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
return true;
return false;
// Non Blocking. returns count of how many inserts succeeded
private int tryPublishInternal(ArrayList<Object> objs) {
MessagePassingQueue.Supplier<Object> supplier =
new MessagePassingQueue.Supplier<Object>() {
int counter = 0;
public Object get() {
return objs.get(counter++);
int count = recvQueue.fill(supplier, objs.size());
return count;
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
inserter = b;
} else {
inserter = directInserter;
return inserter;
* Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
public void publish(Object obj) throws InterruptedException {
Inserter inserter = getInserter();
* Non-blocking call, returns false if full.
public boolean tryPublish(Object obj) {
Inserter inserter = getInserter();
return inserter.tryPublish(obj);
* Non-blocking call. Bypasses any batching that may be enabled on the recvQueue. Intended for sending flush/metrics tuples
public boolean tryPublishDirect(Object obj) {
return tryPublishInternal(obj);
* Un-batched write to overflowQ. Should only be called by WorkerTransfer returns false if overflowLimit has reached
public boolean tryPublishToOverflow(Object obj) {
if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) {
return false;
return true;
public void recordMsgDrop() {
public boolean isEmptyOverflow() {
return overflowQ.isEmpty();
public int getOverflowCount() {
return overflowQ.size();
public int getQueuedCount() {
return recvQueue.size();
* if(batchSz>1) : Blocking call. Does not return until at least 1 element is drained or Thread.interrupt() is received if(batchSz==1)
* : NO-OP. Returns immediately. doesnt throw.
public void flush() throws InterruptedException {
Inserter inserter = getInserter();
* if(batchSz>1) : Non-Blocking call. Tries to flush as many as it can. Returns true if flushed at least 1. if(batchSz==1) : This is a
* NO-OP. Returns true immediately.
public boolean tryFlush() {
Inserter inserter = getInserter();
return inserter.tryFlush();
public Object getState() {
return metrics.getState();
//This method enables the metrics to be accessed from outside of the JCQueue class
public JCQueue.QueueMetrics getMetrics() {
return metrics;
private interface Inserter {
// blocking call that can be interrupted using Thread.interrupt()
void publish(Object obj) throws InterruptedException;
boolean tryPublish(Object obj);
void flush() throws InterruptedException;
boolean tryFlush();
public interface Consumer extends MessagePassingQueue.Consumer<Object> {
void accept(Object event);
void flush() throws InterruptedException;
public interface ExitCondition {
boolean keepRunning();
/* Thread safe. Same instance can be used across multiple threads */
private static class DirectInserter implements Inserter {
private JCQueue queue;
public DirectInserter(JCQueue queue) {
this.queue = queue;
* Blocking call, that can be interrupted via Thread.interrupt
public void publish(Object obj) throws InterruptedException {
boolean inserted = queue.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
idleCount = queue.backPressureWaitStrategy.idle(idleCount);
if (Thread.interrupted()) {
throw new InterruptedException();
inserted = queue.tryPublishInternal(obj);
* Non-Blocking call. return value indicates success/failure
public boolean tryPublish(Object obj) {
boolean inserted = queue.tryPublishInternal(obj);
if (!inserted) {
return false;
return true;
public void flush() throws InterruptedException {
public boolean tryFlush() {
return true;
} // class DirectInserter
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private JCQueue queue;
private ArrayList<Object> currentBatch;
public BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
* Blocking call - retires till element is successfully added.
public void publish(Object obj) throws InterruptedException {
if (currentBatch.size() >= batchSz) {
* Non-Blocking call. return value indicates success/failure
public boolean tryPublish(Object obj) {
if (currentBatch.size() >= batchSz) {
if (!tryFlush()) {
return false;
return true;
* Blocking call - Does not return until at least 1 element is drained or Thread.interrupt() is received. Uses backpressure wait
* strategy.
public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
retryCount = queue.backPressureWaitStrategy.idle(retryCount);
if (Thread.interrupted()) {
throw new InterruptedException();
publishCount = queue.tryPublishInternal(currentBatch);
currentBatch.subList(0, publishCount).clear();
* Non blocking call. tries to flush as many as possible. Returns true if at least one from non-empty currentBatch was flushed or if
* currentBatch is empty. Returns false otherwise
public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
return false;
} else {
currentBatch.subList(0, publishCount).clear();
return true;
} // class BatchInserter
* This inner class provides methods to access the metrics of the JCQueue.
public class QueueMetrics implements Closeable {
private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
private final AtomicLong droppedMessages = new AtomicLong(0);
public long population() {
return recvQueue.size();
public long capacity() {
return recvQueue.capacity();
public Object getState() {
Map<String, Object> state = new HashMap<>();
final double arrivalRateInSecs = arrivalsTracker.reportRate();
long tuplePop = population();
// Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
long cap = capacity();
float pctFull = (1.0F * tuplePop / cap);
state.put("capacity", cap);
state.put("pct_full", pctFull);
state.put("population", tuplePop);
state.put("arrival_rate_secs", arrivalRateInSecs);
state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
state.put("insert_failures", insertFailuresTracker.reportRate());
state.put("dropped_messages", droppedMessages);
state.put("overflow", overflowQ.size());
return state;
public void notifyArrivals(long counts) {
public void notifyInsertFailure() {
public void notifyDroppedMsg() {
public void close() {