blob: 388651d60d155b477179c1e414260bf5022b1d35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.transfer;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nemo.runtime.common.comm.ControlMessage.ByteTransferContextSetupMessage;
import org.apache.nemo.runtime.common.comm.ControlMessage.ByteTransferDataDirection;
import java.util.List;
/**
* Interprets inbound byte streams to compose frames.
* <p>
* <p>
* More specifically,
* <ul>
* <li>Recognizes the type of the frame, namely control or data.</li>
* <li>If the received bytes are a part of a control frame, waits until the full content of the frame becomes
* available and decode the frame to emit a control frame object.</li>
* <li>If the received bytes consists a data frame, supply the data to the corresponding {@link ByteInputContext}.
* </ul>
*
* <h3>Control frame specification:</h3>
* <pre>
* {@literal
* <----- HEADER ----> <----- BODY ----->
* +--------+---------+-------...-------+
* | Zeros | Length | Body |
* | 5 byte | 4 bytes | Variable length |
* +--------+---------+-------...-------+
* }
* </pre>
*
* <h3>Data frame specification:</h3>
* <pre>
* {@literal
* <---------------------------------- HEADER ---------------------------------------------------> <----- BODY ----->
* +-------+-------+-------------------+------------------+---------------+-------------+---------+-------...-------+
* | Zeros | 1 | DataDirectionFlag | NewSubStreamFlag | LastFrameFlag | TransferIdx | Length | Body |
* | 4 bit | 1 bit | 1 bit | 1 bit | 1 bit | 4 bytes | 4 bytes | Variable length |
* +-------+-------+-------------------+------------------+---------------+-------------+---------+-------...-------+
* }
* </pre>
*
* @see ByteTransportChannelInitializer
*/
final class FrameDecoder extends ByteToMessageDecoder {
private static final int HEADER_LENGTH = 9;
private final ContextManager contextManager;
/**
* The number of bytes consisting body of a control frame to be read next.
*/
private long controlBodyBytesToRead = 0;
/**
* The number of bytes consisting body of a data frame to be read next.
*/
private long dataBodyBytesToRead = 0;
/**
* The {@link ByteInputContext} to which received bytes are added.
*/
private ByteInputContext inputContext;
/**
* Whether or not the data frame currently being read is the last frame of a data message.
*/
private boolean isLastFrame;
FrameDecoder(final ContextManager contextManager) {
this.contextManager = contextManager;
}
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out)
throws InvalidProtocolBufferException {
while (true) {
final boolean toContinue;
if (controlBodyBytesToRead > 0) {
toContinue = onControlBodyAdded(in, out);
} else if (dataBodyBytesToRead > 0) {
onDataBodyAdded(in);
toContinue = in.readableBytes() > 0;
} else {
toContinue = onFrameStarted(ctx, in);
}
if (!toContinue) {
break;
}
}
}
/**
* Try to decode a frame header.
*
* @param ctx the channel handler context
* @param in the {@link ByteBuf} from which to read data
* @return {@code true} if a header was decoded, {@code false} otherwise
*/
private boolean onFrameStarted(final ChannelHandlerContext ctx, final ByteBuf in) {
assert (controlBodyBytesToRead == 0);
assert (dataBodyBytesToRead == 0);
assert (inputContext == null);
if (in.readableBytes() < HEADER_LENGTH) {
// cannot read a frame header frame now
return false;
}
final byte flags = in.readByte();
final int transferIndex = in.readInt();
final long length = in.readUnsignedInt();
if (length < 0) {
throw new IllegalStateException(String.format("Frame length is negative: %d", length));
}
if ((flags & ((byte) (1 << 3))) == 0) {
// setup context for reading control frame body
controlBodyBytesToRead = length;
} else {
// setup context for reading data frame body
dataBodyBytesToRead = length;
final ByteTransferDataDirection dataDirection = (flags & ((byte) (1 << 2))) == 0
? ByteTransferDataDirection.INITIATOR_SENDS_DATA : ByteTransferDataDirection.INITIATOR_RECEIVES_DATA;
final boolean newSubStreamFlag = (flags & ((byte) (1 << 1))) != 0;
isLastFrame = (flags & ((byte) (1 << 0))) != 0;
inputContext = contextManager.getInputContext(dataDirection, transferIndex);
if (inputContext == null) {
throw new IllegalStateException(String.format("Transport context for %s:%d was not found between the local"
+ "address %s and the remote address %s", dataDirection, transferIndex,
ctx.channel().localAddress(), ctx.channel().remoteAddress()));
}
if (newSubStreamFlag) {
inputContext.onNewStream();
}
if (dataBodyBytesToRead == 0) {
onDataFrameEnd();
}
}
return true;
}
/**
* Try to emit the body of the control frame.
*
* @param in the {@link ByteBuf} from which to read data
* @param out the list to which the body of the control frame is added
* @return {@code true} if the control frame body was emitted, {@code false} otherwise
* @throws InvalidProtocolBufferException when failed to parse
*/
private boolean onControlBodyAdded(final ByteBuf in, final List out)
throws InvalidProtocolBufferException {
assert (controlBodyBytesToRead > 0);
assert (dataBodyBytesToRead == 0);
assert (inputContext == null);
assert (controlBodyBytesToRead <= Integer.MAX_VALUE);
if (in.readableBytes() < controlBodyBytesToRead) {
// cannot read body now
return false;
}
final byte[] bytes;
final int offset;
if (in.hasArray()) {
bytes = in.array();
offset = in.arrayOffset() + in.readerIndex();
} else {
bytes = new byte[(int) controlBodyBytesToRead];
in.getBytes(in.readerIndex(), bytes, 0, (int) controlBodyBytesToRead);
offset = 0;
}
final ByteTransferContextSetupMessage controlMessage
= ByteTransferContextSetupMessage.PARSER.parseFrom(bytes, offset, (int) controlBodyBytesToRead);
out.add(controlMessage);
in.skipBytes((int) controlBodyBytesToRead);
controlBodyBytesToRead = 0;
return true;
}
/**
* Supply byte stream to an existing {@link ByteInputContext}.
*
* @param in the {@link ByteBuf} from which to read data
* @throws InterruptedException when interrupted while adding to {@link ByteBuf} queue
*/
private void onDataBodyAdded(final ByteBuf in) {
assert (controlBodyBytesToRead == 0);
assert (dataBodyBytesToRead > 0);
assert (inputContext != null);
// length should not exceed Integer.MAX_VALUE (since in.readableBytes() returns an int)
final long length = Math.min(dataBodyBytesToRead, in.readableBytes());
assert (length <= Integer.MAX_VALUE);
final ByteBuf body = in.readSlice((int) length).retain();
inputContext.onByteBuf(body);
dataBodyBytesToRead -= length;
if (dataBodyBytesToRead == 0) {
onDataFrameEnd();
}
}
/**
* Closes {@link ByteInputContext} if necessary and resets the internal states of the decoder.
*/
private void onDataFrameEnd() {
if (isLastFrame) {
inputContext.onContextClose();
}
inputContext = null;
}
}