blob: 113594ad18b6ae4d444b0ebf3a4f8b0597cba42f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.pacemaker.codec;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channel;
import backtype.storm.generated.HBMessage;
import backtype.storm.generated.HBServerMessageType;
import org.jboss.netty.buffer.ChannelBuffer;
import backtype.storm.utils.Utils;
import backtype.storm.messaging.netty.ControlMessage;
import backtype.storm.messaging.netty.SaslMessageToken;
public class ThriftDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
long available = buf.readableBytes();
if(available < 2) {
return null;
}
buf.markReaderIndex();
int thriftLen = buf.readInt();
available -= 4;
if(available < thriftLen) {
// We haven't received the entire object yet, return and wait for more bytes.
buf.resetReaderIndex();
return null;
}
buf.discardReadBytes();
HBMessage m;
if(buf.hasArray()) {
m = Utils.thriftDeserialize(HBMessage.class, buf.array(), 0, thriftLen);
buf.readerIndex(buf.readerIndex() + thriftLen);
}
else {
byte serialized[] = new byte[thriftLen];
buf.readBytes(serialized, 0, thriftLen);
m = Utils.thriftDeserialize(HBMessage.class, serialized);
}
if(m.get_type() == HBServerMessageType.CONTROL_MESSAGE) {
ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
return cm;
}
else if(m.get_type() == HBServerMessageType.SASL_MESSAGE_TOKEN) {
SaslMessageToken sm = SaslMessageToken.read(m.get_data().get_message_blob());
return sm;
}
else {
return m;
}
}
}