blob: 4c037aceb91c838e9ca0778840016ba5154794d9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.engine.impl;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.security.SaslFrameBody;
import org.apache.qpid.proton.amqp.transport.EmptyFrame;
import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.framing.TransportFrame;
/**
* Writes Frames to an internal buffer for later processing by the transport.
*/
class FrameWriter {
static final int DEFAULT_FRAME_BUFFER_FULL_MARK = 64 * 1024;
static final int FRAME_HEADER_SIZE = 8;
static final byte AMQP_FRAME_TYPE = 0;
static final byte SASL_FRAME_TYPE = 1;
private final TransportImpl transport;
private final EncoderImpl encoder;
private final FrameWriterBuffer frameBuffer = new FrameWriterBuffer();
// Configuration of this Frame Writer
private int maxFrameSize;
private final byte frameType;
private int frameBufferMaxBytes = DEFAULT_FRAME_BUFFER_FULL_MARK;
// State of current write operation, reset on start of each new write
private int frameStart;
// Frame Writer metrics
private long framesOutput;
FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType, TransportImpl transport) {
this.encoder = encoder;
this.maxFrameSize = maxFrameSize;
this.frameType = frameType;
this.transport = transport;
encoder.setByteBuffer(frameBuffer);
}
boolean isFull() {
return frameBuffer.position() > frameBufferMaxBytes;
}
int readBytes(ByteBuffer dst) {
return frameBuffer.transferTo(dst);
}
long getFramesOutput() {
return framesOutput;
}
void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
void setFrameWriterMaxBytes(int maxBytes) {
this.frameBufferMaxBytes = maxBytes;
}
int getFrameWriterMaxBytes() {
return frameBufferMaxBytes;
}
void writeHeader(byte[] header) {
frameBuffer.put(header, 0, header.length);
}
void writeFrame(Object frameBody) {
writeFrame(0, frameBody, null, null);
}
void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
frameStart = frameBuffer.position();
final int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge);
final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE;
final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
if (transport.isFrameTracingEnabled()) {
logFrame(channel, frameBody, payload, payloadSize);
}
if (payloadSize > 0) {
int oldLimit = payload.limit();
payload.limit(payload.position() + payloadSize);
frameBuffer.put(payload);
payload.limit(oldLimit);
}
endFrame(channel);
framesOutput++;
}
private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
frameBuffer.position(frameStart + FRAME_HEADER_SIZE);
if (frameBody != null) {
encoder.writeObject(frameBody);
}
int performativeSize = frameBuffer.position() - frameStart;
if (onPayloadTooLarge != null && maxFrameSize > 0 && payload != null && (payload.remaining() + performativeSize) > maxFrameSize) {
// Next iteration will re-encode the frame body again with updates from the <payload-to-large>
// handler and then we can move onto the body portion.
onPayloadTooLarge.run();
performativeSize = writePerformative(frameBody, payload, null);
}
return performativeSize;
}
private void endFrame(int channel) {
int frameSize = frameBuffer.position() - frameStart;
int originalPosition = frameBuffer.position();
frameBuffer.position(frameStart);
frameBuffer.putInt(frameSize);
frameBuffer.put((byte) 2);
frameBuffer.put(frameType);
frameBuffer.putShort((short) channel);
frameBuffer.position(originalPosition);
}
private void logFrame(int channel, Object frameBody, ReadableBuffer payload, int payloadSize) {
ProtocolTracer tracer = transport.getProtocolTracer();
if (frameType == AMQP_FRAME_TYPE) {
ReadableBuffer originalPayload = null;
if (payload != null) {
originalPayload = payload.slice();
originalPayload.limit(payloadSize);
}
Binary payloadBin = Binary.create(originalPayload);
FrameBody body = null;
if (frameBody == null) {
body = EmptyFrame.INSTANCE;
} else {
body = (FrameBody) frameBody;
}
TransportFrame frame = new TransportFrame(channel, body, payloadBin);
transport.log(TransportImpl.OUTGOING, frame);
if (tracer != null) {
tracer.sentFrame(frame);
}
} else {
SaslFrameBody body = (SaslFrameBody) frameBody;
transport.log(TransportImpl.OUTGOING, body);
if (tracer != null) {
tracer.sentSaslBody(body);
}
}
}
}