blob: fed684ef8b9fef8cd251d0c2f33cbdfdc68fd242 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.messaging.netty;
import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class Client implements IConnection {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
private static final String PREFIX = "Netty-Client-";
private final int max_retries;
private final int base_sleep_ms;
private final int max_sleep_ms;
private final StormBoundedExponentialBackoffRetry retryPolicy;
private AtomicReference<Channel> channelRef;
private final ClientBootstrap bootstrap;
private InetSocketAddress remote_addr;
private final Random random = new Random();
private final ChannelFactory factory;
private final int buffer_size;
private boolean closing;
private int messageBatchSize;
private AtomicLong pendings;
MessageBatch messageBatch = null;
private AtomicLong flushCheckTimer;
private int flushCheckInterval;
private ScheduledExecutorService scheduler;
@SuppressWarnings("rawtypes")
Client(Map storm_conf, ChannelFactory factory,
ScheduledExecutorService scheduler, String host, int port) {
this.factory = factory;
this.scheduler = scheduler;
channelRef = new AtomicReference<Channel>(null);
closing = false;
pendings = new AtomicLong(0);
flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
// Configure
buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
retryPolicy = new StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
LOG.info("New Netty Client, connect to " + host + ", " + port
+ ", config: " + ", buffer_size: " + buffer_size);
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("sendBufferSize", buffer_size);
bootstrap.setOption("keepAlive", true);
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
// Start the connection attempt.
remote_addr = new InetSocketAddress(host, port);
// setup the connection asyncly now
scheduler.execute(new Runnable() {
@Override
public void run() {
connect();
}
});
Runnable flusher = new Runnable() {
@Override
public void run() {
if(!closing) {
long flushCheckTime = flushCheckTimer.get();
long now = System.currentTimeMillis();
if (now > flushCheckTime) {
Channel channel = channelRef.get();
if (null != channel && channel.isWritable()) {
flush(channel);
}
}
}
}
};
long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
}
/**
* We will retry connection with exponential back-off policy
*/
private synchronized void connect() {
try {
Channel channel = channelRef.get();
if (channel != null && channel.isConnected()) {
return;
}
int tried = 0;
while (tried <= max_retries) {
LOG.info("Reconnect started for {}... [{}]", name(), tried);
LOG.debug("connection started...");
ChannelFuture future = bootstrap.connect(remote_addr);
future.awaitUninterruptibly();
Channel current = future.getChannel();
if (!future.isSuccess()) {
if (null != current) {
current.close();
}
} else {
channel = current;
break;
}
Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
tried++;
}
if (null != channel) {
LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
channelRef.set(channel);
} else {
close();
throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
}
} catch (InterruptedException e) {
throw new RuntimeException("connection failed " + name(), e);
}
}
/**
* Enqueue task messages to be sent to server
*/
synchronized public void send(Iterator<TaskMessage> msgs) {
// throw exception if the client is being closed
if (closing) {
throw new RuntimeException("Client is being closed, and does not take requests any more");
}
if (null == msgs || !msgs.hasNext()) {
return;
}
Channel channel = channelRef.get();
if (null == channel) {
connect();
channel = channelRef.get();
}
while (msgs.hasNext()) {
if (!channel.isConnected()) {
connect();
channel = channelRef.get();
}
TaskMessage message = msgs.next();
if (null == messageBatch) {
messageBatch = new MessageBatch(messageBatchSize);
}
messageBatch.add(message);
if (messageBatch.isFull()) {
MessageBatch toBeFlushed = messageBatch;
flushRequest(channel, toBeFlushed);
messageBatch = null;
}
}
if (null != messageBatch && !messageBatch.isEmpty()) {
if (channel.isWritable()) {
flushCheckTimer.set(Long.MAX_VALUE);
// Flush as fast as we can to reduce the latency
MessageBatch toBeFlushed = messageBatch;
messageBatch = null;
flushRequest(channel, toBeFlushed);
} else {
// when channel is NOT writable, it means the internal netty buffer is full.
// In this case, we can try to buffer up more incoming messages.
flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
}
}
}
public String name() {
if (null != remote_addr) {
return PREFIX + remote_addr.toString();
}
return "";
}
private synchronized void flush(Channel channel) {
if (!closing) {
if (null != messageBatch && !messageBatch.isEmpty()) {
MessageBatch toBeFlushed = messageBatch;
flushCheckTimer.set(Long.MAX_VALUE);
flushRequest(channel, toBeFlushed);
messageBatch = null;
}
}
}
/**
* gracefully close this client.
*
* We will send all existing requests, and then invoke close_n_release()
* method
*/
public synchronized void close() {
if (!closing) {
closing = true;
LOG.info("Closing Netty Client " + name());
if (null != messageBatch && !messageBatch.isEmpty()) {
MessageBatch toBeFlushed = messageBatch;
Channel channel = channelRef.get();
if (channel != null) {
flushRequest(channel, toBeFlushed);
}
messageBatch = null;
}
//wait for pendings to exit
final long timeoutMilliSeconds = 600 * 1000; //600 seconds
final long start = System.currentTimeMillis();
LOG.info("Waiting for pending batchs to be sent with "+ name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
while(pendings.get() != 0) {
try {
long delta = System.currentTimeMillis() - start;
if (delta > timeoutMilliSeconds) {
LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
break;
}
Thread.sleep(1000); //sleep 1s
} catch (InterruptedException e) {
break;
}
}
close_n_release();
}
}
/**
* close_n_release() is invoked after all messages have been sent.
*/
private void close_n_release() {
if (channelRef.get() != null) {
channelRef.get().close();
LOG.debug("channel {} closed",remote_addr);
}
}
@Override
public Iterator<TaskMessage> recv(int flags, int clientId) {
throw new RuntimeException("Client connection should not receive any messages");
}
@Override
public void send(int taskId, byte[] payload) {
TaskMessage msg = new TaskMessage(taskId, payload);
List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
wrapper.add(msg);
send(wrapper.iterator());
}
private void flushRequest(Channel channel, final MessageBatch requests) {
if (requests == null)
return;
pendings.incrementAndGet();
ChannelFuture future = channel.write(requests);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future)
throws Exception {
pendings.decrementAndGet();
if (!future.isSuccess()) {
LOG.info(
"failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
Channel channel = future.getChannel();
if (null != channel) {
channel.close();
channelRef.compareAndSet(channel, null);
}
} else {
LOG.debug("{} request(s) sent", requests.size());
}
}
});
}
}