blob: edd9a7e1b91ea54dde095d030d8a10e789397c8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.IntConsumer;
import org.apache.cassandra.utils.Pair;
public class BytesInFlightController
{
private static final AtomicLongFieldUpdater<BytesInFlightController> sentBytesUpdater = AtomicLongFieldUpdater.newUpdater(BytesInFlightController.class, "sentBytes");
private static final AtomicLongFieldUpdater<BytesInFlightController> receivedBytesUpdater = AtomicLongFieldUpdater.newUpdater(BytesInFlightController.class, "receivedBytes");
private volatile long minimumInFlightBytes, maximumInFlightBytes;
private volatile long sentBytes;
private volatile long receivedBytes;
private final ConcurrentLinkedQueue<Pair<Integer, IntConsumer>> deferredBytes = new ConcurrentLinkedQueue<>();
private final ConcurrentSkipListMap<Long, Thread> waitingToSend = new ConcurrentSkipListMap<>();
BytesInFlightController(long maximumInFlightBytes)
{
this.maximumInFlightBytes = maximumInFlightBytes;
}
public void send(long bytes) throws InterruptedException
{
long sentBytes = sentBytesUpdater.getAndAdd(this, bytes);
maybeProcessDeferred();
if ((sentBytes - receivedBytes) >= maximumInFlightBytes)
{
long waitUntilReceived = sentBytes - maximumInFlightBytes;
// overlap shouldn't occur, but cannot guarantee it when we modify maximumInFlightBytes
Thread prev = waitingToSend.putIfAbsent(waitUntilReceived, Thread.currentThread());
while (prev != null)
prev = waitingToSend.putIfAbsent(++waitUntilReceived, Thread.currentThread());
boolean isInterrupted;
while (!(isInterrupted = Thread.currentThread().isInterrupted())
&& waitUntilReceived - receivedBytes >= 0
&& waitingToSend.get(waitUntilReceived) != null)
{
LockSupport.park();
}
waitingToSend.remove(waitUntilReceived);
if (isInterrupted)
throw new InterruptedException();
}
}
public long minimumInFlightBytes() { return minimumInFlightBytes; }
public long maximumInFlightBytes() { return maximumInFlightBytes; }
void adjust(int predictedSentBytes, int actualSentBytes)
{
receivedBytesUpdater.addAndGet(this, predictedSentBytes - actualSentBytes);
if (predictedSentBytes > actualSentBytes) wakeupSenders();
else maybeProcessDeferred();
}
public long inFlight()
{
return sentBytes - receivedBytes;
}
public void fail(int bytes)
{
receivedBytesUpdater.addAndGet(this, bytes);
wakeupSenders();
}
public void process(int bytes, IntConsumer releaseBytes)
{
while (true)
{
long sent = sentBytes;
long received = receivedBytes;
long newReceived = received + bytes;
if (sent - newReceived <= minimumInFlightBytes)
{
deferredBytes.add(Pair.create(bytes, releaseBytes));
break;
}
if (receivedBytesUpdater.compareAndSet(this, received, newReceived))
{
releaseBytes.accept(bytes);
break;
}
}
maybeProcessDeferred();
wakeupSenders();
}
void setInFlightByteBounds(long minimumInFlightBytes, long maximumInFlightBytes)
{
this.minimumInFlightBytes = minimumInFlightBytes;
this.maximumInFlightBytes = maximumInFlightBytes;
maybeProcessDeferred();
}
// unlike the rest of the class, this method does not handle wrap-around of sent/received;
// since this shouldn't happen it's no big deal, but maybe for absurdly long runs it might.
// if so, fix it.
private void wakeupSenders()
{
Map.Entry<Long, Thread> next;
while (null != (next = waitingToSend.firstEntry()))
{
if (next.getKey() - receivedBytes >= 0)
break;
if (waitingToSend.remove(next.getKey(), next.getValue()))
LockSupport.unpark(next.getValue());
}
}
private void maybeProcessDeferred()
{
while (true)
{
long sent = sentBytes;
long received = receivedBytes;
if (sent - received <= minimumInFlightBytes)
break;
Pair<Integer, IntConsumer> next = deferredBytes.poll();
if (next == null)
break;
int receive = next.left;
IntConsumer callbacks = next.right;
while (true)
{
long newReceived = received + receive;
if (receivedBytesUpdater.compareAndSet(this, received, newReceived))
{
callbacks.accept(receive);
wakeupSenders();
break;
}
sent = sentBytes;
received = receivedBytes;
if (sent - received <= minimumInFlightBytes)
{
deferredBytes.add(next);
break; // continues with outer loop to maybe process it if minimumInFlightBytes has changed meanwhile
}
}
}
}
}