blob: fc4f8619095578ba665bf7e215f81d7ee65672a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.utils.memory.BufferPool;
import org.apache.cassandra.utils.memory.BufferPools;
public abstract class FrameEncoder extends ChannelOutboundHandlerAdapter
{
protected static final BufferPool bufferPool = BufferPools.forNetworking();
/**
* An abstraction useful for transparently allocating buffers that can be written to upstream
* of the {@code FrameEncoder} without knowledge of the encoder's frame layout, while ensuring
* enough space to write the remainder of the frame's contents is reserved.
*/
public static class Payload
{
public static final int MAX_SIZE = 1 << 17;
// isSelfContained is a flag in the Frame API, indicating if the contents consists of only complete messages
private boolean isSelfContained;
// the buffer to write to
public final ByteBuffer buffer;
// the number of header bytes to reserve
final int headerLength;
// the number of trailer bytes to reserve
final int trailerLength;
// an API-misuse detector
private boolean isFinished = false;
Payload(boolean isSelfContained, int payloadCapacity)
{
this(isSelfContained, payloadCapacity, 0, 0);
}
Payload(boolean isSelfContained, int payloadCapacity, int headerLength, int trailerLength)
{
this.isSelfContained = isSelfContained;
this.headerLength = headerLength;
this.trailerLength = trailerLength;
buffer = bufferPool.getAtLeast(payloadCapacity + headerLength + trailerLength, BufferType.OFF_HEAP);
assert buffer.capacity() >= payloadCapacity + headerLength + trailerLength;
buffer.position(headerLength);
buffer.limit(buffer.capacity() - trailerLength);
}
void setSelfContained(boolean isSelfContained)
{
this.isSelfContained = isSelfContained;
}
// do not invoke after finish()
int length()
{
assert !isFinished;
return buffer.position() - headerLength;
}
// do not invoke after finish()
public int remaining()
{
assert !isFinished;
return buffer.remaining();
}
// do not invoke after finish()
void trim(int length)
{
assert !isFinished;
buffer.position(headerLength + length);
}
// may not be written to or queried, after this is invoked; must be passed straight to an encoder (or release called)
public void finish()
{
assert !isFinished;
isFinished = true;
buffer.limit(buffer.position() + trailerLength);
buffer.position(0);
bufferPool.putUnusedPortion(buffer);
}
public void release()
{
bufferPool.put(buffer);
}
}
public interface PayloadAllocator
{
public static final PayloadAllocator simple = Payload::new;
Payload allocate(boolean isSelfContained, int capacity);
}
public PayloadAllocator allocator()
{
return PayloadAllocator.simple;
}
/**
* Takes ownership of the lifetime of the provided buffer, which can be assumed to be managed by BufferPool
*/
abstract ByteBuf encode(boolean isSelfContained, ByteBuffer buffer);
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
{
if (!(msg instanceof Payload))
throw new IllegalStateException("Unexpected type: " + msg);
Payload payload = (Payload) msg;
ByteBuf write = encode(payload.isSelfContained, payload.buffer);
ctx.write(write, promise);
}
}