blob: 8ef0a8f0e631bc9581415fedfb66e02441c7d821 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
/**
* A {@link DataOutputStreamPlus} that writes ASYNCHRONOUSLY to a Netty Channel.
*
* Intended as single use, to write one (large) message.
*
* The close() and flush() methods synchronously wait for pending writes, and will propagate any exceptions
* encountered in writing them to the wire.
*
* The correctness of this class depends on the ChannelPromise we create against a Channel always being completed,
* which appears to be a guarantee provided by Netty so long as the event loop is running.
*/
public class AsyncMessageOutputPlus extends AsyncChannelOutputPlus
{
/**
* the maximum {@link #highWaterMark} and minimum {@link #lowWaterMark} number of bytes we have flushing
* during which we should still be writing to the channel.
*
* i.e., if we are at or below the {@link #lowWaterMark} we should definitely start writing again;
* if we are at or above the {@link #highWaterMark} we should definitely stop writing;
* if we are inbetween, it is OK to either write or not write
*
* note that we consider the bytes we are about to write to our high water mark, but not our low.
* i.e., we will not begin a write that would take us over our high water mark, unless not doing so would
* take us below our low water mark.
*
* This is somewhat arbitrary accounting, and a meaningless distinction for flushes of a consistent size.
*/
@SuppressWarnings("JavaDoc")
private final int highWaterMark;
private final int lowWaterMark;
private final int bufferSize;
private final int messageSize;
private boolean closing;
private final FrameEncoder.PayloadAllocator payloadAllocator;
private volatile FrameEncoder.Payload payload;
AsyncMessageOutputPlus(Channel channel, int bufferSize, int messageSize, FrameEncoder.PayloadAllocator payloadAllocator)
{
super(channel);
WriteBufferWaterMark waterMark = channel.config().getWriteBufferWaterMark();
this.lowWaterMark = waterMark.low();
this.highWaterMark = waterMark.high();
this.messageSize = messageSize;
this.bufferSize = Math.min(messageSize, bufferSize);
this.payloadAllocator = payloadAllocator;
allocateBuffer();
}
private void allocateBuffer()
{
payload = payloadAllocator.allocate(false, bufferSize);
buffer = payload.buffer;
}
@Override
protected void doFlush(int count) throws IOException
{
if (!channel.isOpen())
throw new ClosedChannelException();
// flush the current backing write buffer only if there's any pending data
FrameEncoder.Payload flush = payload;
int byteCount = flush.length();
if (byteCount == 0)
return;
if (byteCount + flushed() > (closing ? messageSize : messageSize - 1))
throw new InvalidSerializedSizeException(messageSize, byteCount + flushed());
flush.finish();
ChannelPromise promise = beginFlush(byteCount, lowWaterMark, highWaterMark);
channel.writeAndFlush(flush, promise);
allocateBuffer();
}
public void close() throws IOException
{
closing = true;
if (flushed() == 0 && payload != null)
payload.setSelfContained(true);
super.close();
}
public long position()
{
return flushed() + payload.length();
}
/**
* Discard any buffered data, and the buffers that contain it.
* May be invoked instead of {@link #close()} if we terminate exceptionally.
*/
public void discard()
{
if (payload != null)
{
payload.release();
payload = null;
buffer = null;
}
}
}