blob: f5228d5acdbc114511f11537bc35e8c8bb046564 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.network.netty;
import org.slf4j.Logger;
import com.baidu.hugegraph.computer.core.network.TransportUtil;
import com.baidu.hugegraph.computer.core.network.message.PingMessage;
import com.baidu.hugegraph.util.Log;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
/**
* Heart beat triggered.
* Send ping message
*/
@ChannelHandler.Sharable
public class HeartbeatHandler extends ChannelDuplexHandler {
private static final Logger LOG = Log.logger(HeartbeatHandler.class);
public static final AttributeKey<Integer> TIMEOUT_HEARTBEAT_COUNT =
AttributeKey.valueOf("TIMEOUT_HEARTBEAT_COUNT");
public static final AttributeKey<Integer> MAX_TIMEOUT_HEARTBEAT_COUNT =
AttributeKey.valueOf("MAX_TIMEOUT_HEARTBEAT_COUNT");
@Override
public void userEventTriggered(final ChannelHandlerContext ctx,
Object event) throws Exception {
if (event instanceof IdleStateEvent) {
Channel channel = ctx.channel();
Integer maxTimeoutCount = channel.attr(MAX_TIMEOUT_HEARTBEAT_COUNT)
.get();
assert maxTimeoutCount != null;
Integer lastTimeoutCount = channel.attr(TIMEOUT_HEARTBEAT_COUNT)
.get();
assert lastTimeoutCount != null;
int timeoutHeartbeatCount = lastTimeoutCount + 1;
if (timeoutHeartbeatCount > maxTimeoutCount) {
LOG.info("The number of timeouts for waiting " +
"heartbeat response more than the " +
"max_timeout_heartbeat_count, close connection to " +
"'{}' from client side, count: {}",
TransportUtil.remoteAddress(channel),
timeoutHeartbeatCount);
ctx.close();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Client IdleStateEvent trigger to send ping to " +
"'{}', count: {}",
TransportUtil.remoteAddress(channel),
timeoutHeartbeatCount);
}
ctx.writeAndFlush(PingMessage.INSTANCE)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.attr(TIMEOUT_HEARTBEAT_COUNT)
.set(timeoutHeartbeatCount);
}
} else {
super.userEventTriggered(ctx, event);
}
}
}