blob: 045602eccf5284829f6ee27dc942fb5728b78d5b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.common.network;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
/**
* Defines IncomingPacket
* <p>
* TODO -- Sanjeev will add a detailed description of this application level protocol later
* <p>
* When allocating the ByteBuffer, we have two options:
* 1. Normal java heap buffer by invoking ByteBuffer.allocate(...),
* 2. Native heap buffer by invoking ByteBuffer.allocateDirect(...),
* Though it would require extra memory copies in java heap buffer, after experiments trying to use
* both of them, we choose to use normal java heap buffer, since:
* 1. It is unsafe to use direct buffer:
* -- Direct buffer would not trigger gc;
* -- We could not control when to release the resources of direct buffer explicitly;
* -- It is hard to guarantee direct buffer would not break limitation of native heap,
* i.e. not throw OutOfMemoryError.
* <p>
* 2. Experiments are done by using direct buffer and the resources saving is negligible:
* -- Direct buffer would save, in our scenarios, less than 1% of RAM;
* -- Direct buffer could save 30%~50% CPU of Gateway thread.
* However, the CPU used by Gateway thread is negligible,
* less than 2% out of the whole usage in worst case.
* -- The extra copy is within JVM boundary; it is pretty fast.
*/
public class IncomingPacket {
private static final Logger LOG = Logger.getLogger(IncomingPacket.class.getName());
private ByteBuffer header;
private ByteBuffer data;
private boolean headerRead;
public IncomingPacket() {
header = ByteBuffer.allocate(4);
headerRead = false;
}
public int readFromChannel(SocketChannel channel) {
return readFromChannel(channel, Integer.MAX_VALUE);
}
public int readFromChannel(SocketChannel channel, long limit) {
if (!headerRead) {
int retval = readFromChannel(channel, header);
if (retval != 0) {
// either we didnt read fully or we had an error
return retval;
}
// We read the header fully
headerRead = true;
header.flip();
int size = header.getInt();
if (size > limit) {
LOG.log(Level.SEVERE, "packet size " + size + " exceeds limit " + limit);
return -1;
}
data = ByteBuffer.allocate(size);
}
int retval = readFromChannel(channel, data);
if (retval == 0) {
data.flip();
}
return retval;
}
private int readFromChannel(SocketChannel channel, ByteBuffer buffer) {
int remaining = buffer.remaining();
int wrote = 0;
try {
wrote = channel.read(buffer);
} catch (IOException e) {
LOG.log(Level.SEVERE, "Error in channel.read ", e);
return -1;
}
if (wrote < 0) {
// We encountered an end of stream. report error
LOG.severe("channel.read returned negative " + wrote);
return wrote;
} else {
// We wrote something
return remaining - wrote;
}
}
// TODO:- Multiple copies going on here.
public String unpackString() {
int size = data.getInt();
byte[] bytes = new byte[size];
data.get(bytes);
return new String(bytes);
}
public REQID unpackREQID() {
return new REQID(data);
}
// TODO:- Multiple copies going on here.
public void unpackMessage(Message.Builder builder) {
int size = data.getInt();
byte[] bytes = new byte[size];
data.get(bytes);
try {
builder.mergeFrom(bytes);
} catch (InvalidProtocolBufferException e) {
LOG.log(Level.SEVERE, "InvalidProtocolBufferException: ", e);
}
}
// TODO -- the calculation is not accurate but work
public int size() {
return data == null ? 0 : data.capacity();
}
}