| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.alibaba.jstorm.message.netty; |
| |
| import backtype.storm.Config; |
| import backtype.storm.messaging.IConnection; |
| import backtype.storm.messaging.TaskMessage; |
| import backtype.storm.utils.DisruptorQueue; |
| import backtype.storm.utils.Utils; |
| import com.alibaba.jstorm.client.ConfigExtension; |
| import com.alibaba.jstorm.common.metric.*; |
| import com.alibaba.jstorm.metric.*; |
| import com.alibaba.jstorm.utils.JStormServerUtils; |
| import com.alibaba.jstorm.utils.JStormUtils; |
| import com.alibaba.jstorm.utils.NetWorkUtils; |
| import com.codahale.metrics.health.HealthCheck; |
| import org.jboss.netty.bootstrap.ClientBootstrap; |
| import org.jboss.netty.channel.Channel; |
| import org.jboss.netty.channel.ChannelFactory; |
| import org.jboss.netty.channel.ChannelFuture; |
| import org.jboss.netty.channel.ChannelFutureListener; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| class NettyClient implements IConnection { |
| private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class); |
| |
| protected String name; |
| |
| protected final int max_retries; |
| protected final int base_sleep_ms; |
| protected final int max_sleep_ms; |
| protected final long timeoutMs; |
| protected final int MAX_SEND_PENDING; |
| |
| protected AtomicInteger retries; |
| |
| protected AtomicReference<Channel> channelRef; |
| protected ClientBootstrap bootstrap; |
| protected final InetSocketAddress remote_addr; |
| protected final ChannelFactory factory; |
| |
| protected final int buffer_size; |
| protected final AtomicBoolean being_closed; |
| |
| protected AtomicLong pendings; |
| protected int messageBatchSize; |
| protected AtomicReference<MessageBatch> messageBatchRef; |
| |
| protected ScheduledExecutorService scheduler; |
| |
| protected String address; |
| // doesn't use timer, due to competition |
| protected AsmHistogram sendTimer; |
| protected AsmHistogram batchSizeHistogram; |
| protected AsmMeter sendSpeed; |
| protected static AsmMeter totalSendSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName( |
| MetricDef.NETTY_CLI_SEND_SPEED, MetricType.METER), new AsmMeter()); |
| |
| protected ReconnectRunnable reconnector; |
| protected ChannelFactory clientChannelFactory; |
| |
| protected Set<Channel> closingChannel; |
| |
| protected AtomicBoolean isConnecting = new AtomicBoolean(false); |
| |
| protected NettyConnection nettyConnection; |
| |
| protected Map stormConf; |
| |
| protected boolean connectMyself; |
| |
| protected Object channelClosing = new Object(); |
| |
| protected boolean enableNettyMetrics; |
| |
| @SuppressWarnings("rawtypes") |
| NettyClient(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) { |
| this.stormConf = storm_conf; |
| this.factory = factory; |
| this.scheduler = scheduler; |
| this.reconnector = reconnector; |
| |
| retries = new AtomicInteger(0); |
| channelRef = new AtomicReference<Channel>(null); |
| being_closed = new AtomicBoolean(false); |
| pendings = new AtomicLong(0); |
| |
| nettyConnection = new NettyConnection(); |
| nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(storm_conf)); |
| nettyConnection.setServerPort(host, port); |
| |
| // Configure |
| buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); |
| max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES))); |
| base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); |
| max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); |
| |
| timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(storm_conf); |
| MAX_SEND_PENDING = (int) ConfigExtension.getNettyMaxSendPending(storm_conf); |
| |
| this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); |
| messageBatchRef = new AtomicReference<MessageBatch>(); |
| |
| // Start the connection attempt. |
| remote_addr = new InetSocketAddress(host, port); |
| name = remote_addr.toString(); |
| connectMyself = isConnectMyself(stormConf, host, port); |
| |
| address = JStormServerUtils.getName(host, port); |
| |
| this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(storm_conf); |
| LOG.info("** enable netty metrics: {}", this.enableNettyMetrics); |
| if (!connectMyself) { |
| registerMetrics(); |
| } |
| closingChannel = new HashSet<Channel>(); |
| } |
| |
| public void registerMetrics() { |
| if (this.enableNettyMetrics) { |
| sendTimer = (AsmHistogram) JStormMetrics.registerNettyMetric( |
| MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), |
| MetricType.HISTOGRAM), |
| new AsmHistogram()); |
| batchSizeHistogram = (AsmHistogram) JStormMetrics.registerNettyMetric( |
| MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), |
| MetricType.HISTOGRAM), |
| new AsmHistogram()); |
| sendSpeed = (AsmMeter) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName( |
| AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER), new AsmMeter()); |
| |
| CacheGaugeHealthCheck cacheGauge = new CacheGaugeHealthCheck(messageBatchRef, |
| MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString()); |
| JStormMetrics.registerNettyMetric(MetricUtils |
| .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE), |
| new AsmGauge(cacheGauge)); |
| |
| JStormMetrics.registerNettyMetric(MetricUtils |
| .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE), |
| new AsmGauge(new com.codahale.metrics.Gauge<Double>() { |
| @Override |
| public Double getValue() { |
| return ((Long) pendings.get()).doubleValue(); |
| } |
| })); |
| |
| JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(), |
| cacheGauge); |
| } |
| |
| JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(), |
| new HealthCheck() { |
| Result healthy = Result.healthy(); |
| Result unhealthy = Result |
| .unhealthy("NettyConnection " + nettyConnection.toString() + " is broken."); |
| |
| @Override |
| protected Result check() throws Exception { |
| if (isChannelReady() == null) { |
| return unhealthy; |
| } else { |
| return healthy; |
| } |
| } |
| |
| }); |
| } |
| |
| public void start() { |
| bootstrap = new ClientBootstrap(clientChannelFactory); |
| bootstrap.setOption("tcpNoDelay", true); |
| bootstrap.setOption("reuserAddress", true); |
| bootstrap.setOption("sendBufferSize", buffer_size); |
| bootstrap.setOption("keepAlive", true); |
| |
| // Set up the pipeline factory. |
| bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf)); |
| reconnect(); |
| } |
| |
| public boolean isConnectMyself(Map conf, String host, int port) { |
| String localIp = NetWorkUtils.ip(); |
| String remoteIp = NetWorkUtils.host2Ip(host); |
| int localPort = ConfigExtension.getLocalWorkerPort(conf); |
| |
| if (localPort == port && localIp.equals(remoteIp)) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| public void notifyInterestChanged(Channel channel) { |
| if (channel.isWritable()) { |
| MessageBatch messageBatch = messageBatchRef.getAndSet(null); |
| flushRequest(channel, messageBatch); |
| } |
| } |
| |
| /** |
| * The function can't be synchronized, otherwise it will be deadlock |
| */ |
| public void doReconnect() { |
| if (channelRef.get() != null) { |
| |
| // if (channelRef.get().isWritable()) { |
| // LOG.info("already exist a writable channel, give up reconnect, {}", |
| // channelRef.get()); |
| // return; |
| // } |
| return; |
| } |
| |
| if (isClosed() == true) { |
| return; |
| } |
| |
| if (isConnecting.getAndSet(true)) { |
| LOG.info("Connect twice {}", name()); |
| return; |
| } |
| |
| long sleepMs = getSleepTimeMs(); |
| LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name, sleepMs); |
| ChannelFuture future = bootstrap.connect(remote_addr); |
| future.addListener(new ChannelFutureListener() { |
| public void operationComplete(ChannelFuture future) throws Exception { |
| isConnecting.set(false); |
| Channel channel = future.getChannel(); |
| if (future.isSuccess()) { |
| // do something else |
| LOG.info("Connection established, channel = :{}", channel); |
| setChannel(channel); |
| // handleResponse(); |
| } else { |
| LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", retries.get(), name, channel, future.getCause()); |
| reconnect(); |
| } |
| } |
| }); |
| JStormUtils.sleepMs(sleepMs); |
| } |
| |
| public void reconnect() { |
| reconnector.pushEvent(this); |
| } |
| |
| /** |
| * # of milliseconds to wait per exponential back-off policy |
| */ |
| private int getSleepTimeMs() { |
| int sleepMs = base_sleep_ms * retries.incrementAndGet(); |
| if (sleepMs > 1000) { |
| sleepMs = 1000; |
| } |
| return sleepMs; |
| } |
| |
| /** |
| * Enqueue a task message to be sent to server |
| */ |
| @Override |
| public void send(List<TaskMessage> messages) { |
| LOG.warn("Should be overload"); |
| } |
| |
| @Override |
| public void send(TaskMessage message) { |
| LOG.warn("Should be overload"); |
| } |
| |
| Channel isChannelReady() { |
| Channel channel = channelRef.get(); |
| if (channel == null) { |
| return null; |
| } |
| |
| // improve performance skill check |
| if (channel.isWritable() == false) { |
| return null; |
| } |
| |
| return channel; |
| } |
| |
| protected synchronized void flushRequest(Channel channel, final MessageBatch requests) { |
| if (requests == null || requests.isEmpty()) |
| return; |
| |
| Long batchSize = (long) requests.getEncoded_length(); |
| if (batchSizeHistogram != null) { |
| batchSizeHistogram.update(batchSize); |
| } |
| pendings.incrementAndGet(); |
| if (sendSpeed != null) { |
| sendSpeed.update(batchSize); |
| } |
| totalSendSpeed.update(batchSize); |
| ChannelFuture future = channel.write(requests); |
| future.addListener(new ChannelFutureListener() { |
| public void operationComplete(ChannelFuture future) throws Exception { |
| |
| pendings.decrementAndGet(); |
| if (!future.isSuccess()) { |
| Channel channel = future.getChannel(); |
| if (isClosed() == false) { |
| LOG.info("Failed to send requests to " + name + ": " + channel.toString() + ":", future.getCause()); |
| } |
| |
| if (null != channel) { |
| |
| exceptionChannel(channel); |
| } |
| } else { |
| // LOG.debug("{} request(s) sent", requests.size()); |
| } |
| } |
| }); |
| } |
| |
| public void unregisterMetrics() { |
| if (this.enableNettyMetrics) { |
| JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( |
| AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), MetricType.HISTOGRAM)); |
| JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( |
| AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), MetricType.HISTOGRAM)); |
| JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( |
| AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE)); |
| JStormMetrics.unregisterNettyMetric(MetricUtils |
| .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE)); |
| JStormMetrics.unregisterNettyMetric(MetricUtils |
| .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER)); |
| } |
| JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString()); |
| |
| JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString()); |
| } |
| |
| /** |
| * gracefully close this client. |
| * <p/> |
| * We will send all existing requests, and then invoke close_n_release() method |
| */ |
| public void close() { |
| LOG.info("Close netty connection to {}", name()); |
| if (being_closed.compareAndSet(false, true) == false) { |
| LOG.info("Netty client has been closed."); |
| return; |
| } |
| |
| if (!connectMyself) { |
| unregisterMetrics(); |
| } |
| |
| Channel channel = channelRef.get(); |
| if (channel == null) { |
| LOG.info("Channel {} has been closed before", name()); |
| return; |
| } |
| |
| if (channel.isWritable()) { |
| MessageBatch toBeFlushed = messageBatchRef.getAndSet(null); |
| flushRequest(channel, toBeFlushed); |
| } |
| |
| // wait for pendings to exit |
| final long timeoutMilliSeconds = 10 * 1000; |
| final long start = System.currentTimeMillis(); |
| |
| LOG.info("Waiting for pending batchs to be sent with " + name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get()); |
| |
| while (pendings.get() != 0) { |
| try { |
| long delta = System.currentTimeMillis() - start; |
| if (delta > timeoutMilliSeconds) { |
| LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get()); |
| break; |
| } |
| Thread.sleep(1000); // sleep 1s |
| } catch (InterruptedException e) { |
| break; |
| } |
| } |
| |
| close_n_release(); |
| |
| } |
| |
| /** |
| * close_n_release() is invoked after all messages have been sent. |
| */ |
| void close_n_release() { |
| if (channelRef.get() != null) { |
| setChannel(null); |
| } |
| |
| } |
| |
| /** |
| * Avoid channel double close |
| * |
| * @param channel |
| */ |
| void closeChannel(final Channel channel) { |
| synchronized (channelClosing) { |
| if (closingChannel.contains(channel)) { |
| LOG.info(channel.toString() + " is already closed"); |
| return; |
| } |
| |
| closingChannel.add(channel); |
| } |
| |
| LOG.debug(channel.toString() + " begin to closed"); |
| ChannelFuture closeFuture = channel.close(); |
| closeFuture.addListener(new ChannelFutureListener() { |
| public void operationComplete(ChannelFuture future) throws Exception { |
| |
| synchronized (channelClosing) { |
| closingChannel.remove(channel); |
| } |
| LOG.debug(channel.toString() + " finish closed"); |
| } |
| }); |
| } |
| |
| void disconnectChannel(Channel channel) { |
| if (isClosed()) { |
| return; |
| } |
| |
| if (channel == channelRef.get()) { |
| setChannel(null); |
| reconnect(); |
| } else { |
| closeChannel(channel); |
| } |
| |
| } |
| |
| void exceptionChannel(Channel channel) { |
| if (channel == channelRef.get()) { |
| setChannel(null); |
| } else { |
| closeChannel(channel); |
| } |
| } |
| |
| void setChannel(Channel newChannel) { |
| final Channel oldChannel = channelRef.getAndSet(newChannel); |
| |
| if (newChannel != null) { |
| retries.set(0); |
| } |
| |
| final String oldLocalAddres = (oldChannel == null) ? "null" : oldChannel.getLocalAddress().toString(); |
| String newLocalAddress = (newChannel == null) ? "null" : newChannel.getLocalAddress().toString(); |
| LOG.info("Use new channel {} replace old channel {}", newLocalAddress, oldLocalAddres); |
| |
| // avoid one netty client use too much connection, close old one |
| if (oldChannel != newChannel && oldChannel != null) { |
| |
| closeChannel(oldChannel); |
| LOG.info("Successfully close old channel " + oldLocalAddres); |
| // scheduler.schedule(new Runnable() { |
| // |
| // @Override |
| // public void run() { |
| // |
| // } |
| // }, 10, TimeUnit.SECONDS); |
| |
| // @@@ todo |
| // pendings.set(0); |
| } |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return being_closed.get(); |
| } |
| |
| public AtomicBoolean getBeing_closed() { |
| return being_closed; |
| } |
| |
| public int getBuffer_size() { |
| return buffer_size; |
| } |
| |
| public SocketAddress getRemoteAddr() { |
| return remote_addr; |
| } |
| |
| public String name() { |
| return name; |
| } |
| |
| public void handleResponse() { |
| LOG.warn("Should be overload"); |
| } |
| |
| @Override |
| public Object recv(Integer taskId, int flags) { |
| throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); |
| } |
| |
| @Override |
| public void registerQueue(Integer taskId, DisruptorQueue recvQueu) { |
| throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); |
| } |
| |
| @Override |
| public void enqueue(TaskMessage message) { |
| throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); |
| } |
| |
| public static class CacheGaugeHealthCheck extends HealthCheck implements com.codahale.metrics.Gauge<Double> { |
| |
| AtomicReference<MessageBatch> messageBatchRef; |
| String name; |
| Result healthy; |
| |
| public CacheGaugeHealthCheck(AtomicReference<MessageBatch> messageBatchRef, String name) { |
| this.messageBatchRef = messageBatchRef; |
| this.name = name; |
| this.healthy = Result.healthy(); |
| } |
| |
| @Override |
| public Double getValue() { |
| MessageBatch messageBatch = messageBatchRef.get(); |
| if (messageBatch == null) { |
| return 0.0; |
| } else { |
| return (double) messageBatch.getEncoded_length(); |
| } |
| |
| } |
| |
| @Override |
| protected Result check() throws Exception { |
| Double size = getValue(); |
| if (size > 8 * JStormUtils.SIZE_1_M) { |
| return Result.unhealthy(name + QueueGauge.QUEUE_IS_FULL); |
| } else { |
| return healthy; |
| } |
| } |
| |
| } |
| |
| @Override |
| public boolean available() { |
| return (isChannelReady() != null); |
| } |
| } |