blob: 9b5806323fc3370a43f96c007b7d59148b312f63 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.message.netty;
import backtype.storm.Config;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
class NettyClientAsync extends NettyClient {
private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
public static final String PREFIX = "Netty-Client-";
// when batch buffer size is more than BATCH_THREASHOLD_WARN
// it will block Drainer thread
protected long BATCH_THREASHOLD_WARN;
protected final boolean directlySend;
protected AtomicBoolean flush_later;
protected int flushCheckInterval;
protected final boolean blockSend;
boolean isDirectSend(Map conf) {
if (JStormServerUtils.isOnePending(conf) == true) {
return true;
}
return !ConfigExtension.isNettyTransferAsyncBatch(conf);
}
boolean isBlockSend(Map storm_conf) {
if (ConfigExtension.isTopologyContainAcker(storm_conf) == false) {
return false;
}
return ConfigExtension.isNettyASyncBlock(storm_conf);
}
@SuppressWarnings("rawtypes")
NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
super(storm_conf, factory, scheduler, host, port, reconnector);
BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf);
blockSend = isBlockSend(storm_conf);
directlySend = isDirectSend(storm_conf);
flush_later = new AtomicBoolean(false);
flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
Runnable flusher = new Runnable() {
@Override
public void run() {
flush();
}
};
long initialDelay = Math.min(1000, max_sleep_ms * max_retries);
scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
clientChannelFactory = factory;
start();
LOG.info(this.toString());
}
/**
* Enqueue a task message to be sent to server
*/
@Override
public synchronized void send(List<TaskMessage> messages) {
// throw exception if the client is being closed
if (isClosed()) {
LOG.warn("Client is being closed, and does not take requests any more");
return;
}
long start = System.nanoTime();
try {
pushBatch(messages);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
long end = System.nanoTime();
if (sendTimer != null) {
sendTimer.update((end - start) / TimeUtils.NS_PER_US);
}
}
}
@Override
public synchronized void send(TaskMessage message) {
// throw exception if the client is being closed
if (isClosed()) {
LOG.warn("Client is being closed, and does not take requests any more");
return;
}
long start = System.nanoTime();
try {
pushBatch(message);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
long end = System.nanoTime();
if (sendTimer != null) {
sendTimer.update((end - start) / TimeUtils.NS_PER_US);
}
}
}
void waitChannelReady(long cachedSize, long sleepMs) {
long begin = System.currentTimeMillis();
boolean changeThreadhold = false;
IntervalCheck oneSecond = new IntervalCheck();
IntervalCheck timeoutIntervalCheck = new IntervalCheck();
timeoutIntervalCheck.setIntervalMs(timeoutMs);
sleepMs = (sleepMs < timeoutMs) ? sleepMs : timeoutMs;
while (isChannelReady() == null) {
long now = System.currentTimeMillis();
long delt = now - begin;
if (oneSecond.check() == true) {
LOG.warn("Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms", name, pendings.get(), cachedSize, delt);
}
if (timeoutIntervalCheck.check() == true) {
if (messageBatchRef.get() != null) {
LOG.warn("Target server {} is unavailable, wait too much time, throw timeout message", name);
messageBatchRef.set(null);
}
setChannel(null);
LOG.warn("Reset channel as null");
if (blockSend == false) {
reconnect();
break;
}
}
reconnect();
JStormUtils.sleepMs(sleepMs);
if (delt > 2 * timeoutMs * 1000L && changeThreadhold == false) {
if (channelRef.get() != null && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) {
// it is just channel isn't writable;
BATCH_THREASHOLD_WARN = BATCH_THREASHOLD_WARN / 2;
LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", BATCH_THREASHOLD_WARN);
changeThreadhold = true;
}
}
if (isClosed()) {
LOG.info("Channel has been closed " + name());
break;
}
}
}
long getDelaySec(long cachedSize) {
long count = cachedSize / BATCH_THREASHOLD_WARN;
long sleepMs = (long) (Math.pow(2, count) * 10);
if (sleepMs > 1000) {
sleepMs = 1000;
}
return sleepMs;
}
void handleFailedChannel(MessageBatch messageBatch) {
messageBatchRef.set(messageBatch);
flush_later.set(true);
long cachedSize = messageBatch.getEncoded_length();
if (cachedSize > BATCH_THREASHOLD_WARN) {
long sleepMs = getDelaySec(cachedSize);
waitChannelReady(cachedSize, sleepMs);
}
return;
}
void pushBatch(List<TaskMessage> messages) {
if (messages.isEmpty()) {
return;
}
MessageBatch messageBatch = messageBatchRef.getAndSet(null);
if (null == messageBatch) {
messageBatch = new MessageBatch(messageBatchSize);
}
for (TaskMessage message : messages) {
if (TaskMessage.isEmpty(message)) {
continue;
}
messageBatch.add(message);
if (messageBatch.isFull()) {
Channel channel = isChannelReady();
if (channel != null) {
flushRequest(channel, messageBatch);
messageBatch = new MessageBatch(messageBatchSize);
}
}
}
Channel channel = isChannelReady();
if (channel == null) {
handleFailedChannel(messageBatch);
return;
} else if (messageBatch.isEmpty() == false) {
flushRequest(channel, messageBatch);
}
return;
}
void pushBatch(TaskMessage message) {
if (TaskMessage.isEmpty(message)) {
return;
}
MessageBatch messageBatch = messageBatchRef.getAndSet(null);
if (null == messageBatch) {
messageBatch = new MessageBatch(messageBatchSize);
}
messageBatch.add(message);
Channel channel = isChannelReady();
if (channel == null) {
handleFailedChannel(messageBatch);
return;
}
if (messageBatch.isFull()) {
flushRequest(channel, messageBatch);
return;
}
if (directlySend) {
flushRequest(channel, messageBatch);
} else {
if (messageBatchRef.compareAndSet(null, messageBatch)) {
flush_later.set(true);
} else
LOG.error("MessageBatch will be lost. This should not happen.");
}
}
void flush() {
if (isClosed() == true) {
return;
}
if (flush_later.get() == false) {
return;
}
Channel channel = isChannelReady();
if (channel == null) {
return;
}
flush_later.set(false);
MessageBatch toBeFlushed = messageBatchRef.getAndSet(null);
flushRequest(channel, toBeFlushed);
}
@Override
Channel isChannelReady() {
Channel channel = super.isChannelReady();
if (channel == null) {
return null;
}
if (blockSend == true && pendings.get() >= MAX_SEND_PENDING) {
return null;
}
return channel;
}
@Override
public void handleResponse() {
// do nothing
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}