blob: 6b5cb7bbe25f188bfd393f41918e3176d1428d88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.bytetransfer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.Recycler;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.List;
/**
* Encodes a data frame into bytes.
*
* @see FrameDecoder
*/
@ChannelHandler.Sharable
final class DataFrameEncoder extends MessageToMessageEncoder<DataFrameEncoder.DataFrame> {
private static final int TRANSFER_INDEX_LENGTH = Integer.BYTES;
private static final int BODY_LENGTH_LENGTH = Integer.BYTES;
private static final int HEADER_LENGTH = Byte.BYTES + TRANSFER_INDEX_LENGTH + BODY_LENGTH_LENGTH;
// the maximum length of a frame body. 2**32 - 1
static final long LENGTH_MAX = 4294967295L;
@Inject
private DataFrameEncoder() {
}
@Override
protected void encode(final ChannelHandlerContext ctx, final DataFrame in, final List out) {
// encode header
final ByteBuf header = ctx.alloc().ioBuffer(HEADER_LENGTH, HEADER_LENGTH);
byte flags = (byte) 0;
flags |= (byte) (1 << 3);
if (in.contextId.getDataDirection() == ControlMessage.ByteTransferDataDirection.INITIATOR_RECEIVES_DATA) {
flags |= (byte) (1 << 2);
}
if (in.opensSubStream) {
flags |= (byte) (1 << 1);
}
if (in.closesContext) {
flags |= (byte) (1 << 0);
}
header.writeByte(flags);
header.writeInt(in.contextId.getTransferIndex());
// in.length should not exceed the range of unsigned int
assert (in.length <= LENGTH_MAX);
header.writeInt((int) in.length);
out.add(header);
// encode body
if (in.body != null) {
out.add(in.body);
}
// recycle DataFrame object
in.recycle();
}
/**
* Data frame representation.
*/
static final class DataFrame {
private static final Recycler<DataFrame> RECYCLER = new Recycler<DataFrame>() {
@Override
protected DataFrame newObject(final Recycler.Handle handle) {
return new DataFrame(handle);
}
};
/**
* Creates a {@link DataFrame}.
*
* @param handle the recycler handle
*/
private DataFrame(final Recycler.Handle handle) {
this.handle = handle;
}
private final Recycler.Handle handle;
private ByteTransferContext.ContextId contextId;
@Nullable
private Object body;
private long length;
private boolean opensSubStream;
private boolean closesContext;
/**
* Creates a {@link DataFrame} to supply content to sub-stream.
*
* @param contextId the context id
* @param body the body or {@code null}
* @param length the length of the body, in bytes
* @param opensSubStream whether this frame opens a new sub-stream or not
* @return the {@link DataFrame} object
*/
static DataFrame newInstance(final ByteTransferContext.ContextId contextId,
@Nullable final Object body,
final long length,
final boolean opensSubStream) {
final DataFrame dataFrame = RECYCLER.get();
dataFrame.contextId = contextId;
dataFrame.body = body;
dataFrame.length = length;
dataFrame.opensSubStream = opensSubStream;
dataFrame.closesContext = false;
return dataFrame;
}
/**
* Creates a {@link DataFrame} to close the whole context.
*
* @param contextId the context id
* @return the {@link DataFrame} object
*/
static DataFrame newInstance(final ByteTransferContext.ContextId contextId) {
final DataFrame dataFrame = RECYCLER.get();
dataFrame.contextId = contextId;
dataFrame.body = null;
dataFrame.length = 0;
dataFrame.opensSubStream = false;
dataFrame.closesContext = true;
return dataFrame;
}
/**
* Recycles this object.
*/
void recycle() {
body = null;
handle.recycle(this);
}
}
}