blob: 38e793032a996947e88d8c1a8d2979fc88353cff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.message.netty;
import backtype.storm.messaging.TaskMessage;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.AsmMeter;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
public class MessageDecoder extends FrameDecoder {
private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
// here doesn't use Timer due to competition
private static AsmHistogram msgDecodeTime = (AsmHistogram) JStormMetrics.registerWorkerMetric(
MetricUtils.workerMetricName(MetricDef.NETWORK_MSG_DECODE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
private static AsmMeter recvSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(
MetricUtils.workerMetricName(MetricDef.NETTY_SRV_RECV_SPEED, MetricType.METER), new AsmMeter());
private static Map<Channel, AsmHistogram> networkTransmitTimeMap = new HashMap<Channel, AsmHistogram>();
private static Map<Channel, String> transmitNameMap = new HashMap<Channel, String>();
private boolean isServer;
private String localIp;
private int localPort;
private boolean enableTransitTimeMetrics;
public MessageDecoder(boolean isServer, Map conf) {
this.isServer = isServer;
this.localPort = ConfigExtension.getLocalWorkerPort(conf);
this.localIp = NetWorkUtils.ip();
this.enableTransitTimeMetrics = MetricUtils.isEnableNettyMetrics(conf);
}
/*
* Each ControlMessage is encoded as: code (<0) ... short(2) Each TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload ... byte[] *
*/
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
// Make sure that we have received at least a short
long available = buf.readableBytes();
// Length of control message is 10.
// Minimum length of a task message is 6(short taskId, int length).
if (available < 6) {
// need more data
return null;
}
Long startTime = null;
if (isServer) {
startTime = System.nanoTime();
}
try {
// Mark the current buffer position before reading task/len field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex();
// read the short field
short code = buf.readShort();
available -= 2;
// case 1: Control message
ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
if (ctrl_msg != null) {
if (available < 12) {
// The time stamp bytes were not received yet - return null.
buf.resetReaderIndex();
return null;
}
long timeStamp = buf.readLong();
int clientPort = buf.readInt();
available -= 12;
if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
long interval = System.currentTimeMillis() - timeStamp;
if (interval < 0)
interval = 0;
if(enableTransitTimeMetrics) {
AsmHistogram netTransTime = getTransmitHistogram(channel, clientPort);
if (netTransTime != null) {
netTransTime.update(interval * TimeUtils.NS_PER_US);
}
}
}
recvSpeed.update(ControlMessage.encodeLength());
return ctrl_msg;
}
// case 2: task Message
short task = code;
// Make sure that we have received at least an integer (length)
if (available < 4) {
// need more data
buf.resetReaderIndex();
return null;
}
// Read the length field.
int length = buf.readInt();
if (length <= 0) {
LOG.info("Receive one message whose TaskMessage's message length is {}", length);
return new TaskMessage(task, null);
}
// Make sure if there's enough bytes in the buffer.
available -= 4;
if (available < length) {
// The whole bytes were not received yet - return null.
buf.resetReaderIndex();
return null;
}
// There's enough bytes in the buffer. Read it.
ChannelBuffer payload = buf.readBytes(length);
// Successfully decoded a frame.
// Return a TaskMessage object
byte[] rawBytes = payload.array();
// @@@ TESTING CODE
// LOG.info("Receive task:{}, length: {}, data:{}",
// task, length, JStormUtils.toPrintableString(rawBytes));
TaskMessage ret = new TaskMessage(task, rawBytes);
recvSpeed.update(rawBytes.length + 6);
return ret;
} finally {
if (isServer) {
Long endTime = System.nanoTime();
msgDecodeTime.update((endTime - startTime) / TimeUtils.NS_PER_US);
}
}
}
public AsmHistogram getTransmitHistogram(Channel channel, int clientPort) {
AsmHistogram netTransTime = networkTransmitTimeMap.get(channel);
if (netTransTime == null) {
InetSocketAddress sockAddr = (InetSocketAddress) (channel.getRemoteAddress());
String nettyConnection = NettyConnection.mkString(sockAddr.getAddress().getHostAddress(), clientPort, localIp, localPort);
netTransTime =
(AsmHistogram) JStormMetrics.registerNettyMetric(
MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection), MetricType.HISTOGRAM),
new AsmHistogram());
networkTransmitTimeMap.put(channel, netTransTime);
transmitNameMap.put(channel, nettyConnection);
LOG.info("Register Transmit Histogram of {}, channel {}", nettyConnection, channel);
}
return netTransTime;
}
public static void removeTransmitHistogram(Channel channel) {
AsmHistogram netTransTime = networkTransmitTimeMap.remove(channel);
if (netTransTime != null) {
String nettyConnection = transmitNameMap.remove(channel);
JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection),
MetricType.HISTOGRAM));
LOG.info("Remove Transmit Histogram of {}, channel {}", nettyConnection, channel);
}
}
public static void removeTransmitHistogram(String nettyConnection) {
Channel oldChannel = null;
for (Entry<Channel, String> entry : transmitNameMap.entrySet()) {
if (nettyConnection.equals(entry.getValue())) {
oldChannel = entry.getKey();
}
}
removeTransmitHistogram(oldChannel);
}
}