blob: 8697907cd454321d085cb22de389479fe19a8929 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package backtype.storm.messaging.netty;
import backtype.storm.Config;
import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.metric.api.IStatefulObject;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static;
* A Netty client for sending task messages to a remote destination (Netty server).
* Implementation details:
* - Sending messages, i.e. writing to the channel, is performed asynchronously.
* - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message
* batch size is configurable.
* - Connecting and reconnecting are performed asynchronously.
* - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
* the remote destination is currently unavailable.
public class Client extends ConnectionWithStatus implements IStatefulObject {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
private static final long NO_DELAY_MS = 0L;
private final Map stormConf;
private final StormBoundedExponentialBackoffRetry retryPolicy;
private final ClientBootstrap bootstrap;
private final InetSocketAddress dstAddress;
protected final String dstAddressPrefixedName;
* The channel used for all write operations from this client to the remote destination.
private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
* Maximum number of reconnection attempts we will perform after a disconnect before giving up.
private final int maxReconnectionAttempts;
* Total number of connection attempts.
private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
* Number of connection attempts since the last disconnect.
private final AtomicInteger connectionAttempts = new AtomicInteger(0);
* Number of messages successfully sent to the remote destination.
private final AtomicInteger messagesSent = new AtomicInteger(0);
* Number of messages that could not be sent to the remote destination.
private final AtomicInteger messagesLost = new AtomicInteger(0);
* Number of messages buffered in memory.
private final AtomicLong pendingMessages = new AtomicLong(0);
* This flag is set to true if and only if a client instance is being closed.
private volatile boolean closing = false;
private final Context context;
private final HashedWheelTimer scheduler;
private final MessageBuffer batcher;
private final Object writeLock = new Object();
Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
this.stormConf = stormConf;
closing = false;
this.scheduler = scheduler;
this.context = context;
int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));"creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
// Initiate connection to remote destination
bootstrap = createClientBootstrap(factory, bufferSize);
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
batcher = new MessageBuffer(messageBatchSize);
private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("sendBufferSize", bufferSize);
bootstrap.setOption("keepAlive", true);
bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
return bootstrap;
private String prefixedName(InetSocketAddress dstAddress) {
if (null != dstAddress) {
return PREFIX + dstAddress.toString();
return "";
* We will retry connection with exponential back-off policy
private void scheduleConnect(long delayMs) {
scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
private boolean reconnectingAllowed() {
return !closing && connectionAttempts.get() <= (maxReconnectionAttempts + 1);
private boolean connectionEstablished(Channel channel) {
// Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
// established iff the channel is connected. That is, a TCP-based channel must be in the CONNECTED state before
// anything can be read or written to the channel.
// See:
// -
// -
return channel != null && channel.isConnected();
* Note: Storm will check via this method whether a worker can be activated safely during the initial startup of a
* topology. The worker will only be activated once all of the its connections are ready.
public Status status() {
if (closing) {
return Status.Closed;
} else if (!connectionEstablished(channelRef.get())) {
return Status.Connecting;
} else {
return Status.Ready;
* Receiving messages is not supported by a client.
* @throws java.lang.UnsupportedOperationException whenever this method is being called.
public Iterator<TaskMessage> recv(int flags, int clientId) {
throw new UnsupportedOperationException("Client connection should not receive any messages");
public void send(int taskId, byte[] payload) {
TaskMessage msg = new TaskMessage(taskId, payload);
List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
* Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
public void send(Iterator<TaskMessage> msgs) {
if (closing) {
int numMessages = iteratorSize(msgs);
LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
if (!hasMessages(msgs)) {
Channel channel = getConnectedChannel();
if (channel == null) {
* Connection is unavailable. We will drop pending messages and let at-least-once message replay kick in.
* Another option would be to buffer the messages in memory. But this option has the risk of causing OOM errors,
* especially for topologies that disable message acking because we don't know whether the connection recovery will
* succeed or not, and how long the recovery will take.
synchronized (writeLock) {
while (msgs.hasNext()) {
TaskMessage message =;
MessageBatch full = batcher.add(message);
if(full != null){
flushMessages(channel, full);
synchronized (writeLock) {
// Netty's internal buffer is not full and we still have message left in the buffer.
// We should write the unfilled MessageBatch immediately to reduce latency
MessageBatch batch = batcher.drain();
if(batch != null) {
flushMessages(channel, batch);
} else {
// Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
// batch. This yields better throughput.
// We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
// because we know `Channel.isWritable` was false after the messages were already in the buffer.
private Channel getConnectedChannel() {
Channel channel = channelRef.get();
if (connectionEstablished(channel)) {
return channel;
} else {
// Closing the channel and reconnecting should be done before handling the messages.
boolean reconnectScheduled = closeChannelAndReconnect(channel);
if (reconnectScheduled) {
// Log the connection error only once
LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
return null;
private boolean hasMessages(Iterator<TaskMessage> msgs) {
return msgs != null && msgs.hasNext();
private void dropMessages(Iterator<TaskMessage> msgs) {
// We consume the iterator by traversing and thus "emptying" it.
int msgCount = iteratorSize(msgs);
private int iteratorSize(Iterator<TaskMessage> msgs) {
int size = 0;
if (msgs != null) {
while (msgs.hasNext()) {
return size;
* Asynchronously writes the message batch to the channel.
* If the write operation fails, then we will close the channel and trigger a reconnect.
private void flushMessages(Channel channel, final MessageBatch batch) {
if (null == batch || batch.isEmpty()) {
final int numMessages = batch.size();
LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
ChannelFuture future = channel.write(batch);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
pendingMessages.addAndGet(0 - numMessages);
if (future.isSuccess()) {
LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
} else {
LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
* Schedule a reconnect if we closed a non-null channel, and acquired the right to
* provide a replacement by successfully setting a null to the channel field
* @param channel
* @return if the call scheduled a re-connect task
private boolean closeChannelAndReconnect(Channel channel) {
if (channel != null) {
if (channelRef.compareAndSet(channel, null)) {
return true;
return false;
* Gracefully close this client.
public void close() {
if (!closing) {"closing Netty Client {}", dstAddressPrefixedName);
// Set closing to true to prevent any further reconnection attempts.
closing = true;
private void waitForPendingMessagesToBeSent() {"waiting up to {} ms to send {} pending messages to {}",
PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
try {
long deltaMs = System.currentTimeMillis() - startMs;
LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
"sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
catch (InterruptedException e) {
private void closeChannel() {
Channel channel = channelRef.get();
if (channel != null) {
LOG.debug("channel to {} closed", dstAddressPrefixedName);
public Object getState() {"Getting metrics for client connection to {}", dstAddressPrefixedName);
HashMap<String, Object> ret = new HashMap<String, Object>();
ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
ret.put("sent", messagesSent.getAndSet(0));
ret.put("pending", pendingMessages.get());
ret.put("lostOnSend", messagesLost.getAndSet(0));
ret.put("dest", dstAddress.toString());
String src = srcAddressName();
if (src != null) {
ret.put("src", src);
return ret;
public Map getStormConf() {
return stormConf;
private String srcAddressName() {
String name = null;
Channel channel = channelRef.get();
if (channel != null) {
SocketAddress address = channel.getLocalAddress();
if (address != null) {
name = address.toString();
return name;
public String toString() {
return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
* Called by Netty thread on change in channel interest
* @param channel
public void notifyInterestChanged(Channel channel) {
synchronized (writeLock) {
// Channel is writable again, write if there are any messages pending
MessageBatch pending = batcher.drain();
flushMessages(channel, pending);
* Asynchronously establishes a Netty connection to the remote address
* This task runs on a single thread shared among all clients, and thus
* should not perform operations that block.
private class Connect implements TimerTask {
private final InetSocketAddress address;
public Connect(InetSocketAddress address) {
this.address = address;
private void reschedule(Throwable t) {
String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
public void run(Timeout timeout) throws Exception {
if (reconnectingAllowed()) {
final int connectionAttempt = connectionAttempts.getAndIncrement();
LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
ChannelFuture future = bootstrap.connect(address);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
// This call returns immediately
Channel newChannel = future.getChannel();
if (future.isSuccess() && connectionEstablished(newChannel)) {
boolean setChannel = channelRef.compareAndSet(null, newChannel);
LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
if (messagesLost.get() > 0) {
LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
} else {
Throwable cause = future.getCause();
if (newChannel != null) {
} else {
throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");