blob: 63983db63f73e0bccfc7c026782d65697b8979cf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.common.network;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.protobuf.Message;
/**
* Defines OutgoingPacket
* <p>
* TODO -- Sanjeev will add a detailed description of this application level protocol later
* <p>
* When allocating the ByteBuffer, we have two options:
* 1. Normal java heap buffer by invoking ByteBuffer.allocate(...),
* 2. Native heap buffer by invoking ByteBuffer.allocateDirect(...),
* Though it would require extra memory copies in java heap buffer, after experiments trying to use
* both of them, we choose to use normal java heap buffer, since:
* 1. It is unsafe to use direct buffer:
* -- Direct buffer would not trigger gc;
* -- We could not control when to release the resources of direct buffer explicitly;
* -- It is hard to guarantee direct buffer would not break limitation of native heap,
* i.e. not throw OutOfMemoryError.
* <p>
* 2. Experiments are done by using direct buffer and the resources saving is negligible:
* -- Direct buffer would save, in our scenarios, less than 1% of RAM;
* -- Direct buffer could save 30%~50% CPU of Gateway thread.
* However, the CPU used by Gateway thread is negligible,
* less than 2% out of the whole usage in worst case.
* -- The extra copy is within JVM boundary; it is pretty fast.
*/
public class OutgoingPacket {
private static final Logger LOG = Logger.getLogger(OutgoingPacket.class.getName());
private ByteBuffer buffer;
public OutgoingPacket(REQID reqid, Message message) {
assert message.isInitialized();
// First calculate the total size of the packet
// including the header
int headerSize = 4;
String typename = message.getDescriptorForType().getFullName();
int dataSize = sizeRequiredToPackString(typename)
+ REQID.REQID_SIZE
+ sizeRequiredToPackMessage(message);
buffer = ByteBuffer.allocate(headerSize + dataSize);
// First write out how much data is there as the header
buffer.putInt(dataSize);
// Next write the type string
buffer.putInt(typename.length());
buffer.put(typename.getBytes());
// now the reqid
reqid.pack(buffer);
// finally the proto
// Double copy but it is designed, see the comments on top
buffer.putInt(message.getSerializedSize());
buffer.put(message.toByteArray());
// Make the buffer ready for writing out
BufferHelper.flip(buffer);
}
public static int sizeRequiredToPackString(String str) {
return 4 + str.length();
}
public static int sizeRequiredToPackMessage(Message msg) {
return 4 + msg.getSerializedSize();
}
public int writeToChannel(SocketChannel channel) {
int remaining = buffer.remaining();
assert remaining > 0;
int wrote = 0;
try {
wrote = channel.write(buffer);
} catch (IOException e) {
LOG.log(Level.SEVERE, "Error writing to channel ", e);
return -1;
}
return remaining - wrote;
}
public int size() {
return buffer.capacity();
}
}