blob: 983db3ef51107b072c3ad54dc6657b730520af7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.locks.LockSupport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import static java.lang.Math.max;
/**
* A {@link DataOutputStreamPlus} that writes ASYNCHRONOUSLY to a Netty Channel.
*
* The close() and flush() methods synchronously wait for pending writes, and will propagate any exceptions
* encountered in writing them to the wire.
*
* The correctness of this class depends on the ChannelPromise we create against a Channel always being completed,
* which appears to be a guarantee provided by Netty so long as the event loop is running.
*
* There are two logical threads accessing the state in this class: the eventLoop of the channel, and the writer
* (the writer thread may change, so long as only one utilises the class at any time).
* Each thread has exclusive write access to certain state in the class, with the other thread only viewing the state,
* simplifying concurrency considerations.
*/
public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlus
{
public static class FlushException extends IOException
{
public FlushException(String message)
{
super(message);
}
public FlushException(String message, Throwable cause)
{
super(message, cause);
}
}
final Channel channel;
/** the number of bytes we have begun flushing; updated only by writer */
private volatile long flushing;
/** the number of bytes we have finished flushing, successfully or otherwise; updated only by eventLoop */
private volatile long flushed;
/** the number of bytes we have finished flushing to the network; updated only by eventLoop */
private long flushedToNetwork;
/** any error that has been thrown during a flush; updated only by eventLoop */
private volatile Throwable flushFailed;
/**
* state for pausing until flushing has caught up - store the number of bytes we need to be flushed before
* we should be signalled, and store ourselves in {@link #waiting}; once the flushing thread exceeds this many
* total bytes flushed, any Thread stored in waiting will be signalled.
*
* This works exactly like using a WaitQueue, except that we only need to manage a single waiting thread.
*/
private volatile long signalWhenFlushed; // updated only by writer
private volatile Thread waiting; // updated only by writer
public AsyncChannelOutputPlus(Channel channel)
{
super(null, null);
this.channel = channel;
}
/**
* Create a ChannelPromise for a flush of the given size.
* <p>
* This method will not return until the write is permitted by the provided watermarks and in flight bytes,
* and on its completion will mark the requested bytes flushed.
* <p>
* If this method returns normally, the ChannelPromise MUST be writtenAndFlushed, or else completed exceptionally.
*/
protected ChannelPromise beginFlush(long byteCount, long lowWaterMark, long highWaterMark) throws IOException
{
waitForSpace(byteCount, lowWaterMark, highWaterMark);
return AsyncChannelPromise.withListener(channel, future -> {
if (future.isSuccess() && null == flushFailed)
{
flushedToNetwork += byteCount;
releaseSpace(byteCount);
}
else if (null == flushFailed)
{
Throwable cause = future.cause();
if (cause == null)
{
cause = new FlushException("Flush failed for unknown reason");
cause.fillInStackTrace();
}
flushFailed = cause;
releaseSpace(flushing - flushed);
}
else
{
assert flushing == flushed;
}
});
}
/**
* Imposes our lowWaterMark/highWaterMark constraints, and propagates any exceptions thrown by prior flushes.
*
* If we currently have lowWaterMark or fewer bytes flushing, we are good to go.
* If our new write will not take us over our highWaterMark, we are good to go.
* Otherwise, we wait until either of these conditions are met.
*
* This may only be invoked by the writer thread, never by the eventLoop.
*
* @throws IOException if a prior asynchronous flush failed
*/
private void waitForSpace(long bytesToWrite, long lowWaterMark, long highWaterMark) throws IOException
{
// decide when we would be willing to carry on writing
// we are always writable if we have lowWaterMark or fewer bytes, no matter how many bytes we are flushing
// our callers should not be supplying more than (highWaterMark - lowWaterMark) bytes, but we must work correctly if they do
long wakeUpWhenFlushing = highWaterMark - bytesToWrite;
waitUntilFlushed(max(lowWaterMark, wakeUpWhenFlushing), lowWaterMark);
flushing += bytesToWrite;
}
/**
* Implementation of waitForSpace, which calculates what flushed points we need to wait for,
* parks if necessary and propagates flush failures.
*
* This may only be invoked by the writer thread, never by the eventLoop.
*/
void waitUntilFlushed(long wakeUpWhenExcessBytesWritten, long signalWhenExcessBytesWritten) throws IOException
{
// we assume that we are happy to wake up at least as early as we will be signalled; otherwise we will never exit
assert signalWhenExcessBytesWritten <= wakeUpWhenExcessBytesWritten;
// flushing shouldn't change during this method invocation, so our calculations for signal and flushed are consistent
long wakeUpWhenFlushed = flushing - wakeUpWhenExcessBytesWritten;
if (flushed < wakeUpWhenFlushed)
parkUntilFlushed(wakeUpWhenFlushed, flushing - signalWhenExcessBytesWritten);
propagateFailedFlush();
}
/**
* Utility method for waitUntilFlushed, which actually parks the current thread until the necessary
* number of bytes have been flushed
*
* This may only be invoked by the writer thread, never by the eventLoop.
*/
protected void parkUntilFlushed(long wakeUpWhenFlushed, long signalWhenFlushed)
{
assert wakeUpWhenFlushed <= signalWhenFlushed;
assert waiting == null;
this.waiting = Thread.currentThread();
this.signalWhenFlushed = signalWhenFlushed;
while (flushed < wakeUpWhenFlushed)
LockSupport.park();
waiting = null;
}
/**
* Update our flushed count, and signal any waiters.
*
* This may only be invoked by the eventLoop, never by the writer thread.
*/
protected void releaseSpace(long bytesFlushed)
{
long newFlushed = flushed + bytesFlushed;
flushed = newFlushed;
Thread thread = waiting;
if (thread != null && signalWhenFlushed <= newFlushed)
LockSupport.unpark(thread);
}
private void propagateFailedFlush() throws IOException
{
Throwable t = flushFailed;
if (t != null)
{
if (SocketFactory.isCausedByConnectionReset(t))
throw new FlushException("The channel this output stream was writing to has been closed", t);
throw new FlushException("This output stream is in an unsafe state after an asynchronous flush failed", t);
}
}
@Override
abstract protected void doFlush(int count) throws IOException;
abstract public long position();
public long flushed()
{
// external flushed (that which has had flush() invoked implicitly or otherwise) == internal flushing
return flushing;
}
public long flushedToNetwork()
{
return flushedToNetwork;
}
/**
* Perform an asynchronous flush, then waits until all outstanding flushes have completed
*
* @throws IOException if any flush fails
*/
@Override
public void flush() throws IOException
{
doFlush(0);
waitUntilFlushed(0, 0);
}
/**
* Flush any remaining writes, and release any buffers.
*
* The channel is not closed, as it is assumed to be managed externally.
*
* WARNING: This method requires mutual exclusivity with all other producer methods to run safely.
* It should only be invoked by the owning thread, never the eventLoop; the eventLoop should propagate
* errors to {@link #flushFailed}, which will propagate them to the producer thread no later than its
* final invocation to {@link #close()} or {@link #flush()} (that must not be followed by any further writes).
*/
@Override
public void close() throws IOException
{
try
{
flush();
}
finally
{
discard();
}
}
/**
* Discard any buffered data, and the buffers that contain it.
* May be invoked instead of {@link #close()} if we terminate exceptionally.
*/
public abstract void discard();
@Override
protected WritableByteChannel newDefaultChannel()
{
throw new UnsupportedOperationException();
}
}