blob: b0f2d9f886ae8591735a2f52ada31681bfe1bb06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.storm.utils;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.internal.RateTracker;
import org.apache.storm.metrics2.JcMetrics;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.shade.org.jctools.queues.MessagePassingQueue;
import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueue implements IStatefulObject, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
private static final String PREFIX = "jc-";
private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR =
new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(PREFIX + "metrics-reporter")
.build());
private final ExitCondition continueRunning = () -> true;
private final JcMetrics jcMetrics;
private final MpscArrayQueue<Object> recvQueue;
// only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
private final MpscUnboundedArrayQueue<Object> overflowQ;
private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting.
private final int producerBatchSz;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
private final JCQueue.QueueMetrics metrics;
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
String topologyId, String componentId, Integer taskId, int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
this.metrics = new JCQueue.QueueMetrics();
this.jcMetrics = metricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port);
//The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
this.backPressureWaitStrategy = backPressureWaitStrategy;
if (!METRICS_REPORTER_EXECUTOR.isShutdown()) {
METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
jcMetrics.set(metrics);
}
}, 15, 15, TimeUnit.SECONDS);
}
}
public String getName() {
return queueName;
}
@Override
public void close() {
//No need to block, the task run by the executor is safe to run even after metrics are closed
METRICS_REPORTER_EXECUTOR.shutdown();
metrics.close();
}
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
*/
public int consume(JCQueue.Consumer consumer) {
return consume(consumer, continueRunning);
}
/**
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q.
*/
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public int size() {
return recvQueue.size() + overflowQ.size();
}
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
*/
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
break;
}
consumer.accept(tuple);
++drainCount;
}
int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
++overflowDrainCount;
consumer.accept(tuple);
}
int total = drainCount + overflowDrainCount;
if (total > 0) {
consumer.flush();
}
return total;
}
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
metrics.notifyArrivals(1);
return true;
}
return false;
}
// Non Blocking. returns count of how many inserts succeeded
private int tryPublishInternal(ArrayList<Object> objs) {
MessagePassingQueue.Supplier<Object> supplier =
new MessagePassingQueue.Supplier<Object>() {
int counter = 0;
@Override
public Object get() {
return objs.get(counter++);
}
};
int count = recvQueue.fill(supplier, objs.size());
metrics.notifyArrivals(count);
return count;
}
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
}
return inserter;
}
/**
* Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
*/
public void publish(Object obj) throws InterruptedException {
Inserter inserter = getInserter();
inserter.publish(obj);
}
/**
* Non-blocking call, returns false if full.
**/
public boolean tryPublish(Object obj) {
Inserter inserter = getInserter();
return inserter.tryPublish(obj);
}
/**
* Non-blocking call. Bypasses any batching that may be enabled on the recvQueue. Intended for sending flush/metrics tuples
*/
public boolean tryPublishDirect(Object obj) {
return tryPublishInternal(obj);
}
/**
* Un-batched write to overflowQ. Should only be called by WorkerTransfer returns false if overflowLimit has reached
*/
public boolean tryPublishToOverflow(Object obj) {
if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) {
return false;
}
overflowQ.add(obj);
return true;
}
public void recordMsgDrop() {
getMetrics().notifyDroppedMsg();
}
public boolean isEmptyOverflow() {
return overflowQ.isEmpty();
}
public int getOverflowCount() {
return overflowQ.size();
}
public int getQueuedCount() {
return recvQueue.size();
}
/**
* if(batchSz>1) : Blocking call. Does not return until at least 1 element is drained or Thread.interrupt() is received if(batchSz==1)
* : NO-OP. Returns immediately. doesnt throw.
*/
public void flush() throws InterruptedException {
Inserter inserter = getInserter();
inserter.flush();
}
/**
* if(batchSz>1) : Non-Blocking call. Tries to flush as many as it can. Returns true if flushed at least 1. if(batchSz==1) : This is a
* NO-OP. Returns true immediately.
*/
public boolean tryFlush() {
Inserter inserter = getInserter();
return inserter.tryFlush();
}
@Override
public Object getState() {
return metrics.getState();
}
//This method enables the metrics to be accessed from outside of the JCQueue class
public JCQueue.QueueMetrics getMetrics() {
return metrics;
}
private interface Inserter {
// blocking call that can be interrupted using Thread.interrupt()
void publish(Object obj) throws InterruptedException;
boolean tryPublish(Object obj);
void flush() throws InterruptedException;
boolean tryFlush();
}
public interface Consumer extends MessagePassingQueue.Consumer<Object> {
void accept(Object event);
void flush() throws InterruptedException;
}
public interface ExitCondition {
boolean keepRunning();
}
/* Thread safe. Same instance can be used across multiple threads */
private static class DirectInserter implements Inserter {
private JCQueue queue;
DirectInserter(JCQueue queue) {
this.queue = queue;
}
/**
* Blocking call, that can be interrupted via Thread.interrupt
*/
@Override
public void publish(Object obj) throws InterruptedException {
boolean inserted = queue.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
queue.metrics.notifyInsertFailure();
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
}
idleCount = queue.backPressureWaitStrategy.idle(idleCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
inserted = queue.tryPublishInternal(obj);
}
}
/**
* Non-Blocking call. return value indicates success/failure
*/
@Override
public boolean tryPublish(Object obj) {
boolean inserted = queue.tryPublishInternal(obj);
if (!inserted) {
queue.metrics.notifyInsertFailure();
return false;
}
return true;
}
@Override
public void flush() throws InterruptedException {
}
@Override
public boolean tryFlush() {
return true;
}
} // class DirectInserter
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private JCQueue queue;
private ArrayList<Object> currentBatch;
BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
}
/**
* Blocking call - retires till element is successfully added.
*/
@Override
public void publish(Object obj) throws InterruptedException {
currentBatch.add(obj);
if (currentBatch.size() >= batchSz) {
flush();
}
}
/**
* Non-Blocking call. return value indicates success/failure
*/
@Override
public boolean tryPublish(Object obj) {
if (currentBatch.size() >= batchSz) {
if (!tryFlush()) {
return false;
}
}
currentBatch.add(obj);
return true;
}
/**
* Blocking call - Does not return until at least 1 element is drained or Thread.interrupt() is received. Uses backpressure wait
* strategy.
*/
@Override
public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
queue.metrics.notifyInsertFailure();
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
}
retryCount = queue.backPressureWaitStrategy.idle(retryCount);
if (Thread.interrupted()) {
throw new InterruptedException();
}
publishCount = queue.tryPublishInternal(currentBatch);
}
currentBatch.subList(0, publishCount).clear();
}
/**
* Non blocking call. tries to flush as many as possible. Returns true if at least one from non-empty currentBatch was flushed or if
* currentBatch is empty. Returns false otherwise
*/
@Override
public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
}
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
queue.metrics.notifyInsertFailure();
return false;
} else {
currentBatch.subList(0, publishCount).clear();
return true;
}
}
} // class BatchInserter
/**
* This inner class provides methods to access the metrics of the JCQueue.
*/
public class QueueMetrics implements Closeable {
private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
private final AtomicLong droppedMessages = new AtomicLong(0);
public long population() {
return recvQueue.size();
}
public long capacity() {
return recvQueue.capacity();
}
public Object getState() {
Map<String, Object> state = new HashMap<>();
final double arrivalRateInSecs = arrivalsTracker.reportRate();
long tuplePop = population();
// Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
// If this assumption does not hold, the calculation of sojourn time should also consider
// departure rate according to Queuing Theory.
final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
long cap = capacity();
float pctFull = (1.0F * tuplePop / cap);
state.put("capacity", cap);
state.put("pct_full", pctFull);
state.put("population", tuplePop);
state.put("arrival_rate_secs", arrivalRateInSecs);
state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
state.put("insert_failures", insertFailuresTracker.reportRate());
state.put("dropped_messages", droppedMessages);
state.put("overflow", overflowQ.size());
return state;
}
public void notifyArrivals(long counts) {
arrivalsTracker.notify(counts);
}
public void notifyInsertFailure() {
insertFailuresTracker.notify(1);
}
public void notifyDroppedMsg() {
droppedMessages.incrementAndGet();
}
@Override
public void close() {
arrivalsTracker.close();
insertFailuresTracker.close();
}
}
}