blob: f330b427f58c6d19c4851835303dfc68947901e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.storm.utils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.shade.org.jctools.queues.MessagePassingQueue;
import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueue implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
private final ExitCondition continueRunning = () -> true;
private final List<JCQueueMetrics> jcqMetrics = new ArrayList<>();
private final MpscArrayQueue<Object> recvQueue;
// only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
private final MpscUnboundedArrayQueue<Object> overflowQ;
private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting.
private final int producerBatchSz;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
for (Integer taskId : taskIds) {
this.jcqMetrics.add(new JCQueueMetrics(metricNamePrefix, topologyId, componentId, taskId, port,
metricRegistry, recvQueue, overflowQ));
}
//The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
this.backPressureWaitStrategy = backPressureWaitStrategy;
}
public String getQueueName() {
return queueName;
}
@Override
public void close() {
for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
jcQueueMetric.close();
}
}
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
*/
public int consume(JCQueue.Consumer consumer) {
return consume(consumer, continueRunning);
}
/**
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q.
*/
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public int size() {
return recvQueue.size() + overflowQ.size();
}
public double getQueueLoad() {
return ((double) recvQueue.size()) / recvQueue.capacity();
}
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
*/
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
break;
}
consumer.accept(tuple);
++drainCount;
}
int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
++overflowDrainCount;
consumer.accept(tuple);
}
int total = drainCount + overflowDrainCount;
if (total > 0) {
consumer.flush();
}
return total;
}
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
jcQueueMetric.notifyArrivals(1);
}
return true;
}
return false;
}
// Non Blocking. returns count of how many inserts succeeded
private int tryPublishInternal(ArrayList<Object> objs) {
MessagePassingQueue.Supplier<Object> supplier =
new MessagePassingQueue.Supplier<Object>() {
int counter = 0;
@Override
public Object get() {
return objs.get(counter++);
}
};
int count = recvQueue.fill(supplier, objs.size());
for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
jcQueueMetric.notifyArrivals(count);
}
return count;
}
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
}
return inserter;
}
/**
* Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
*/
public void publish(Object obj) throws InterruptedException {
Inserter inserter = getInserter();
inserter.publish(obj);
}
/**
* Non-blocking call, returns false if full.
**/
public boolean tryPublish(Object obj) {
Inserter inserter = getInserter();
return inserter.tryPublish(obj);
}
/**
* Non-blocking call. Bypasses any batching that may be enabled on the recvQueue. Intended for sending flush/metrics tuples
*/
public boolean tryPublishDirect(Object obj) {
return tryPublishInternal(obj);
}
/**
* Un-batched write to overflowQ. Should only be called by WorkerTransfer returns false if overflowLimit has reached
*/
public boolean tryPublishToOverflow(Object obj) {
if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) {
return false;
}
overflowQ.add(obj);
return true;
}
public void recordMsgDrop() {
for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
jcQueueMetric.notifyDroppedMsg();
}
}
public boolean isEmptyOverflow() {
return overflowQ.isEmpty();
}
public int getOverflowCount() {
return overflowQ.size();
}
public int getQueuedCount() {
return recvQueue.size();
}
/**
* if(batchSz>1) : Blocking call. Does not return until at least 1 element is drained or Thread.interrupt() is received if(batchSz==1)
* : NO-OP. Returns immediately. doesnt throw.
*/
public void flush() throws InterruptedException {
Inserter inserter = getInserter();
inserter.flush();
}
/**
* if(batchSz>1) : Non-Blocking call. Tries to flush as many as it can. Returns true if flushed at least 1. if(batchSz==1) : This is a
* NO-OP. Returns true immediately.
*/
public boolean tryFlush() {
Inserter inserter = getInserter();
return inserter.tryFlush();
}
private interface Inserter {
// blocking call that can be interrupted using Thread.interrupt()
void publish(Object obj) throws InterruptedException;
boolean tryPublish(Object obj);
void flush() throws InterruptedException;
boolean tryFlush();
}
public interface Consumer extends MessagePassingQueue.Consumer<Object> {
void accept(Object event);
void flush() throws InterruptedException;
}
public interface ExitCondition {
boolean keepRunning();
}
/* Thread safe. Same instance can be used across multiple threads */
private static class DirectInserter implements Inserter {
private JCQueue queue;
DirectInserter(JCQueue queue) {
this.queue = queue;
}
/**
* Blocking call, that can be interrupted via Thread.interrupt
*/
@Override
public void publish(Object obj) throws InterruptedException {
boolean inserted = queue.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait",
queue.getQueueName());
}
idleCount = queue.backPressureWaitStrategy.idle(idleCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
inserted = queue.tryPublishInternal(obj);
}
}
/**
* Non-Blocking call. return value indicates success/failure
*/
@Override
public boolean tryPublish(Object obj) {
boolean inserted = queue.tryPublishInternal(obj);
if (!inserted) {
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
return false;
}
return true;
}
@Override
public void flush() throws InterruptedException {
}
@Override
public boolean tryFlush() {
return true;
}
} // class DirectInserter
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private JCQueue queue;
private ArrayList<Object> currentBatch;
BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
}
/**
* Blocking call - retires till element is successfully added.
*/
@Override
public void publish(Object obj) throws InterruptedException {
currentBatch.add(obj);
if (currentBatch.size() >= batchSz) {
flush();
}
}
/**
* Non-Blocking call. return value indicates success/failure
*/
@Override
public boolean tryPublish(Object obj) {
if (currentBatch.size() >= batchSz) {
if (!tryFlush()) {
return false;
}
}
currentBatch.add(obj);
return true;
}
/**
* Blocking call - Does not return until at least 1 element is drained or Thread.interrupt() is received. Uses backpressure wait
* strategy.
*/
@Override
public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure when flushing batch to Q: '{}'. Entering BackPressure Wait.",
queue.getQueueName());
}
retryCount = queue.backPressureWaitStrategy.idle(retryCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
publishCount = queue.tryPublishInternal(currentBatch);
}
currentBatch.subList(0, publishCount).clear();
}
/**
* Non blocking call. tries to flush as many as possible. Returns true if at least one from non-empty currentBatch was flushed or if
* currentBatch is empty. Returns false otherwise
*/
@Override
public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
}
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
return false;
} else {
currentBatch.subList(0, publishCount).clear();
return true;
}
}
} // class BatchInserter
}